sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755746169


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+

Review Comment:
   I've confused the docs and the classes introduced by this connector. The 
docs state that session-dependent message are created by calling 
`create*Message` on a `Producer` and session-independent messages are created 
by calling `create*Message` on the `JCSMPFactory`. The messages produced by 
this connector are created by the `MessageProducer` class of this connector. 
It's calling out to `JCSMPFactory`, so I see that this connector is indeed 
creating session-independent messages.
   
   Going back over my comment then, the bit about `GroupIntoBatches` is still 
relevant although likely best addressed in a follow-up PR. As an improvement to 
this PR I think we can still remove the state variables that limit the DoFn's 
parallelism since producer instances are thread-safe and manage the connection 
to Solace on a separate I/O thread, so Beam's internal parallelism should have 
no external effect as long as the producer is shared between instances of a 
unique transform. Did you by chance experiment with sharing a producer per 
unique transform configuration or per unique connection configuration instead 
of the fixed pool of 100 producers? If multiple producers improve throughput, 
then would it make sense to allow users to configure the size of the producer 
pool?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to