twosom commented on code in PR #32470:
URL: https://github.com/apache/beam/pull/32470#discussion_r1778733342


##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -538,54 +597,80 @@ public Write 
withConnectionConfiguration(ConnectionConfiguration configuration)
      * @param retained Whether or not the messaging engine should retain the 
message.
      * @return The {@link Write} {@link PTransform} with the corresponding 
retained configuration.
      */
-    public Write withRetained(boolean retained) {
+    public Write<InputT> withRetained(boolean retained) {
       return builder().setRetained(retained).build();
     }
 
-    @Override
-    public PDone expand(PCollection<byte[]> input) {
-      input.apply(ParDo.of(new WriteFn(this)));
-      return PDone.in(input.getPipeline());
-    }
-
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       connectionConfiguration().populateDisplayData(builder);
       builder.add(DisplayData.item("retained", retained()));
     }
 
-    private static class WriteFn extends DoFn<byte[], Void> {
+    @Override
+    public PDone expand(PCollection<InputT> input) {
+      checkArgument(connectionConfiguration() != null, 
"connectionConfiguration can not be null");
+      final SerializableFunction<InputT, String> topicFn;
+      if (dynamic()) {
+        checkArgument(
+            connectionConfiguration().getTopic() == null, "DynamicWrite can 
not have static topic");
+        topicFn = topicFn();
+      } else {
+        checkArgument(connectionConfiguration().getTopic() != null, "topic can 
not be null");
+        final String topic = connectionConfiguration().getTopic();
+        topicFn = ignore -> topic;
+      }
+
+      checkArgument(topicFn != null, "topicFn can not be null");
+      checkArgument(payloadFn() != null, "payloadFn can not be null");
+
+      input.apply(
+          ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn, 
payloadFn(), retained())));
+      return PDone.in(input.getPipeline());
+    }
 
-      private final Write spec;

Review Comment:
   Oh, That's correct! Thanks for feedback! ^_^



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to