twosom commented on code in PR #32470:
URL: https://github.com/apache/beam/pull/32470#discussion_r1778733342
##########
sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java:
##########
@@ -538,54 +597,80 @@ public Write
withConnectionConfiguration(ConnectionConfiguration configuration)
* @param retained Whether or not the messaging engine should retain the
message.
* @return The {@link Write} {@link PTransform} with the corresponding
retained configuration.
*/
- public Write withRetained(boolean retained) {
+ public Write<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}
- @Override
- public PDone expand(PCollection<byte[]> input) {
- input.apply(ParDo.of(new WriteFn(this)));
- return PDone.in(input.getPipeline());
- }
-
@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}
- private static class WriteFn extends DoFn<byte[], Void> {
+ @Override
+ public PDone expand(PCollection<InputT> input) {
+ checkArgument(connectionConfiguration() != null,
"connectionConfiguration can not be null");
+ final SerializableFunction<InputT, String> topicFn;
+ if (dynamic()) {
+ checkArgument(
+ connectionConfiguration().getTopic() == null, "DynamicWrite can
not have static topic");
+ topicFn = topicFn();
+ } else {
+ checkArgument(connectionConfiguration().getTopic() != null, "topic can
not be null");
+ final String topic = connectionConfiguration().getTopic();
+ topicFn = ignore -> topic;
+ }
+
+ checkArgument(topicFn != null, "topicFn can not be null");
+ checkArgument(payloadFn() != null, "payloadFn can not be null");
+
+ input.apply(
+ ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn,
payloadFn(), retained())));
+ return PDone.in(input.getPipeline());
+ }
- private final Write spec;
Review Comment:
Oh, That's correct! Thanks for feedback! ^_^
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]