Abacn commented on code in PR #32470:
URL: https://github.com/apache/beam/pull/32470#discussion_r1777521944
##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -538,54 +597,80 @@ public Write
withConnectionConfiguration(ConnectionConfiguration configuration)
* @param retained Whether or not the messaging engine should retain the
message.
* @return The {@link Write} {@link PTransform} with the corresponding
retained configuration.
*/
- public Write withRetained(boolean retained) {
+ public Write<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}
- @Override
- public PDone expand(PCollection<byte[]> input) {
- input.apply(ParDo.of(new WriteFn(this)));
- return PDone.in(input.getPipeline());
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}
- private static class WriteFn extends DoFn<byte[], Void> {
+ @Override
+ public PDone expand(PCollection<InputT> input) {
+ checkArgument(connectionConfiguration() != null,
"connectionConfiguration can not be null");
+ final SerializableFunction<InputT, String> topicFn;
+ if (dynamic()) {
+ checkArgument(
+ connectionConfiguration().getTopic() == null, "DynamicWrite can
not have static topic");
+ topicFn = topicFn();
+ } else {
+ checkArgument(connectionConfiguration().getTopic() != null, "topic can
not be null");
+ final String topic = connectionConfiguration().getTopic();
+ topicFn = ignore -> topic;
+ }
+
+ checkArgument(topicFn != null, "topicFn can not be null");
+ checkArgument(payloadFn() != null, "payloadFn can not be null");
+
+ input.apply(
+ ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn,
payloadFn(), retained())));
+ return PDone.in(input.getPipeline());
+ }
- private final Write spec;
Review Comment:
what is the purpose of this refactor? One can keep "spec" as well as
constructor parameters for the convenience of future extension of configurations
##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -538,54 +597,80 @@ public Write
withConnectionConfiguration(ConnectionConfiguration configuration)
* @param retained Whether or not the messaging engine should retain the
message.
* @return The {@link Write} {@link PTransform} with the corresponding
retained configuration.
*/
- public Write withRetained(boolean retained) {
+ public Write<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}
- @Override
- public PDone expand(PCollection<byte[]> input) {
- input.apply(ParDo.of(new WriteFn(this)));
- return PDone.in(input.getPipeline());
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}
- private static class WriteFn extends DoFn<byte[], Void> {
+ @Override
+ public PDone expand(PCollection<InputT> input) {
+ checkArgument(connectionConfiguration() != null,
"connectionConfiguration can not be null");
+ final SerializableFunction<InputT, String> topicFn;
+ if (dynamic()) {
Review Comment:
since getTopic() is or not null corresponds to dynamic() being false and
true, we can remove this property, and just use topic nullness to determine the
scenario?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]