iht commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755173582


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;

Review Comment:
   But that's not possible, unless we accumulate the processed `Solace.Record` 
and then we check all the responses, match each response to each incoming 
`Solace.Record` and put the right record in the right place. I think that 
approach would be too heavyweight. The replies are received via "callbacks" 
(polling by the client for responses actually, as implemented in the Solace 
client libraries) by any thread in any worker (potentiall a worker different to 
the one that sent the original message). So doing this would require shuffling 
too.
   
   The reply from Solace only contains some metadata (id, timestamp), and it is 
this info the one that the connector returns, which is a much simpler process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to