sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755746169
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
Review Comment:
I've confused the docs and the classes introduced by this connector. The
docs state that session-dependent message are created by calling
`create*Message` on a `Producer` and session-independent messages are created
by calling `create*Message` on the `JCSMPFactory`. The messages produced by
this connector are created by the `MessageProducer` class of this connector.
It's calling out to `JCSMPFactory`, so I see that this connector is indeed
creating session-independent messages.
Going back over my comment then, the bit about `GroupIntoBatches` is still
relevant although likely best addressed in a follow-up PR. As an improvement to
this PR I think we can still remove the state variables that limit the DoFn's
parallelism since producer instances are thread-safe and manage the connection
to Solace on a separate I/O thread, so Beam's internal parallelism should have
no external effect as long as the producer is shared between instances of a
unique transform. Did you by chance experiment with sharing a producer per
unique transform configuration or per unique connection configuration instead
of the fixed pool of 100 producers? If multiple producers improve throughput,
then would it make sense to allow users to configure the size of the producer
pool?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]