sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1759513137


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;

Review Comment:
   So I did some digging and found a few undocumented JCSMP methods which are 
used by the new Java API to make the resource ownership flow a little more like 
what I was expecting.
   
   The methods are marked as `@SolReserved` on `Jcsmpsession` and are named 
`createProducer`. If I understand correctly, a session owns the outbound 
connection to Solace and a producer uses that resource to produce messages. 
However, the documented `getProducer` method specifies that it will replace a 
previously returned producer, but I'd have to validate that.
   
   The ownership and lifetime model that you probably want here is one where 
there's a shared, long-lived session per unique transform in a static variable, 
some shared map keyed on the transform configuration, and a producer per 
instance of the transform all multiplexed on the shared session. Every writer 
instance needs its own producer since the callbacks are passed as construction 
arguments to the producer, that's the only way to ensure that responses are 
routed to the originating requestor.
   
   Could you check if repeated calls to `getProducer` invalidate prior return 
values like the docs say? If they do, then we'll likely need to use 
`createProducer` and verify with Solace that it's ok to call undocumented 
public methods on their classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to