twosom commented on code in PR #32470:
URL: https://github.com/apache/beam/pull/32470#discussion_r1778724730


##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -538,54 +597,80 @@ public Write 
withConnectionConfiguration(ConnectionConfiguration configuration)
      * @param retained Whether or not the messaging engine should retain the 
message.
      * @return The {@link Write} {@link PTransform} with the corresponding 
retained configuration.
      */
-    public Write withRetained(boolean retained) {
+    public Write<InputT> withRetained(boolean retained) {
       return builder().setRetained(retained).build();
     }
 
-    @Override
-    public PDone expand(PCollection<byte[]> input) {
-      input.apply(ParDo.of(new WriteFn(this)));
-      return PDone.in(input.getPipeline());
-    }
-
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       connectionConfiguration().populateDisplayData(builder);
       builder.add(DisplayData.item("retained", retained()));
     }
 
-    private static class WriteFn extends DoFn<byte[], Void> {
+    @Override
+    public PDone expand(PCollection<InputT> input) {
+      checkArgument(connectionConfiguration() != null, 
"connectionConfiguration can not be null");
+      final SerializableFunction<InputT, String> topicFn;
+      if (dynamic()) {

Review Comment:
   This would also allow calls like MqttIO.write().withTopicPayloadFn(...), 
since they use the same Write class.
   We don't know if the Write object was called from write() or dynamicWrite(). 
This is not enough to determine the scenario based on whether the topic is null.
   The topic value can be modified externally, but the dynamic value cannot.
   I would like to make sure that withTopicPayloadFn() and withPayloadFn() are 
only available when dynamic is true. What do you think? and please take a look 
a the following test code.
   
   
   ```java
   @Test
     public void testWriteWithTopicFn() {
       IllegalArgumentException exception =
           assertThrows(
               IllegalArgumentException.class, () -> 
MqttIO.write().withTopicFn(e -> "some topic"));
   
       assertEquals("withTopicFn can not use in non-dynamic write", 
exception.getMessage());
     }
   
     @Test
     public void testWriteWithPayloadFn() {
       final IllegalArgumentException exception =
           assertThrows(
               IllegalArgumentException.class, () -> 
MqttIO.write().withPayloadFn(e -> new byte[] {}));
   
       assertEquals("withPayloadFn can not use in non-dynamic write", 
exception.getMessage());
     }
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to