sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1750945244


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;
+    }
+
+    private ParDo.MultiOutput<KV<Integer, Solace.Record>, 
Solace.PublishResult> getWriterTransform(
+        SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+      ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult> 
writer =
+          ParDo.of(
+              getWriterType() == WriterType.STREAMING
+                  ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics())
+                  : new UnboundedBatchedSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics()));
+
+      return writer.withOutputTags(FAILED_PUBLISH_TAG, 
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+    }
+
+    private SolaceOutput rewindow(
+        SolaceOutput solacePublishResult,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      PCollection<Solace.PublishResult> correct = 
solacePublishResult.getSuccessfulPublish();
+      PCollection<Solace.PublishResult> failed = 
solacePublishResult.getFailedPublish();
+
+      PCollection<Solace.PublishResult> correctWithWindow = null;
+      PCollection<Solace.PublishResult> failedWithWindow = null;
+
+      if (correct != null) {
+        correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow 
correct");
+      }
+
+      if (failed != null) {
+        failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow 
failed");
+      }
+
+      return SolaceOutput.in(
+          solacePublishResult.getPipeline(), failedWithWindow, 
correctWithWindow);
+    }
+
+    private static PCollection<Solace.PublishResult> applyOriginalWindow(
+        PCollection<Solace.PublishResult> pcoll,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+        String label) {
+      Window<Solace.PublishResult> originalWindow = 
captureWindowDetails(strategy);
+
+      if (strategy.getMode() == 
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+        originalWindow = originalWindow.accumulatingFiredPanes();
+      } else {
+        originalWindow = originalWindow.discardingFiredPanes();
+      }
+
+      return pcoll.apply(label, originalWindow);
+    }
+
+    private static Window<Solace.PublishResult> captureWindowDetails(
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+          .withAllowedLateness(strategy.getAllowedLateness())
+          .withOnTimeBehavior(strategy.getOnTimeBehavior())
+          .withTimestampCombiner(strategy.getTimestampCombiner())
+          .triggering(strategy.getTrigger());
+    }
+
+    /**
+     * Called before running the Pipeline to verify this transform is fully 
and correctly specified.
+     */
+    private void validateWriteTransform(boolean usingSolaceRecords) {
+      if (!usingSolaceRecords) {
+        Preconditions.checkArgument(
+            getFormatFunction() != null,
+            "SolaceIO.Write: If you are not using Solace.Record as the input 
type, you"
+                + " must set a format function using withFormatFunction().");
+      }
+
+      Preconditions.checkArgument(
+          getMaxNumOfUsedWorkers() > 0,
+          "SolaceIO.Write: The number of used workers must be positive.");
+      Preconditions.checkArgument(
+          getNumberOfClientsPerWorker() > 0,
+          "SolaceIO.Write: The number of clients per worker must be 
positive.");
+      Preconditions.checkArgument(
+          getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == 
DeliveryMode.PERSISTENT,
+          String.format(
+              "SolaceIO.Write: Delivery mode must be either DIRECT or 
PERSISTENT. %s"
+                  + " not supported",
+              getDeliveryMode()));
+      if (getPublishLatencyMetrics()) {
+        Preconditions.checkArgument(
+            getDeliveryMode() == DeliveryMode.PERSISTENT,
+            "SolaceIO.Write: Publish latency metrics can only be enabled for 
PERSISTENT"
+                + " delivery mode.");
+      }
+      Preconditions.checkArgument(
+          getSessionServiceFactory() != null,
+          "SolaceIO: You need to pass a session service factory. For basic"
+              + " authentication, you can use 
BasicAuthJcsmpSessionServiceFactory.");

Review Comment:
   
   ```suggestion
         Preconditions.checkNotNull(
             getSessionServiceFactory(),
             "SolaceIO: You need to pass a session service factory. For basic"
                 + " authentication, you can use 
BasicAuthJcsmpSessionServiceFactory.");
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));

Review Comment:
   If you remove Write.withFormatFunction, then use usingSolaceRecord here 
instead of getFormatFunction() == null.
   Also, since you've already validated the write transform configuration and 
you've just branched on getFormatFunction() == null there's no need to call 
checkNotNull(getFormatFunction()) again.
   If you find yourself fighting the null checker, then inline your transform 
validation here in expand. Preconditions uses annotations to signal to the null 
checker that it has checked the nullness property of some variable. Do make 
sure to assign a fresh variable though, since repeated calls to the same method 
could return different values, so the null checker will raise that as a 
potential issue.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;
+    }
+
+    private ParDo.MultiOutput<KV<Integer, Solace.Record>, 
Solace.PublishResult> getWriterTransform(
+        SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+      ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult> 
writer =
+          ParDo.of(
+              getWriterType() == WriterType.STREAMING
+                  ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics())
+                  : new UnboundedBatchedSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics()));
+
+      return writer.withOutputTags(FAILED_PUBLISH_TAG, 
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+    }
+
+    private SolaceOutput rewindow(
+        SolaceOutput solacePublishResult,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      PCollection<Solace.PublishResult> correct = 
solacePublishResult.getSuccessfulPublish();
+      PCollection<Solace.PublishResult> failed = 
solacePublishResult.getFailedPublish();
+
+      PCollection<Solace.PublishResult> correctWithWindow = null;
+      PCollection<Solace.PublishResult> failedWithWindow = null;
+
+      if (correct != null) {
+        correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow 
correct");
+      }
+
+      if (failed != null) {
+        failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow 
failed");
+      }
+
+      return SolaceOutput.in(
+          solacePublishResult.getPipeline(), failedWithWindow, 
correctWithWindow);
+    }
+
+    private static PCollection<Solace.PublishResult> applyOriginalWindow(
+        PCollection<Solace.PublishResult> pcoll,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+        String label) {
+      Window<Solace.PublishResult> originalWindow = 
captureWindowDetails(strategy);
+
+      if (strategy.getMode() == 
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+        originalWindow = originalWindow.accumulatingFiredPanes();
+      } else {
+        originalWindow = originalWindow.discardingFiredPanes();
+      }
+
+      return pcoll.apply(label, originalWindow);
+    }
+
+    private static Window<Solace.PublishResult> captureWindowDetails(
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+          .withAllowedLateness(strategy.getAllowedLateness())
+          .withOnTimeBehavior(strategy.getOnTimeBehavior())
+          .withTimestampCombiner(strategy.getTimestampCombiner())
+          .triggering(strategy.getTrigger());
+    }
+
+    /**
+     * Called before running the Pipeline to verify this transform is fully 
and correctly specified.
+     */
+    private void validateWriteTransform(boolean usingSolaceRecords) {
+      if (!usingSolaceRecords) {
+        Preconditions.checkArgument(
+            getFormatFunction() != null,
+            "SolaceIO.Write: If you are not using Solace.Record as the input 
type, you"
+                + " must set a format function using withFormatFunction().");
+      }

Review Comment:
   
   ```suggestion
         if (!usingSolaceRecords) {
           Preconditions.checkNotNull(
               getFormatFunction(),
               "SolaceIO.Write: If you are not using Solace.Record as the input 
type, you"
                   + " must set a format function using withFormatFunction().");
         }
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -961,6 +977,21 @@ public Write<T> withWriterType(WriterType writerType) {
       return toBuilder().setWriterType(writerType).build();
     }
 
+    /**
+     * Set the format function for your custom data type, and/or for dynamic 
destinations.
+     *
+     * <p>If you are using a custom data class, this function should return a 
{@link Solace.Record}
+     * corresponding to your custom data class instance.
+     *
+     * <p>If you are using this formatting function with dynamic destinations, 
you must ensure that
+     * you set the right value in the destination value of the {@link 
Solace.Record} messages.
+     *
+     * <p>In any other case, this format function is optional.
+     */
+    public Write<T> withFormatFunction(SerializableFunction<T, Solace.Record> 
formatFunction) {
+      return toBuilder().setFormatFunction(formatFunction).build();
+    }

Review Comment:
   This can be removed since the format function is provided as an argument to 
SolaceIO.write().



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);

Review Comment:
   Comparing the typename to a fixed string makes this inflexible to refactors. 
Even if it were restated as calling equals on two classes it wouldn't be 
necessary since isAssignableFrom does not evaluate if the instance is a strict 
supertype of the argument. Per the docs, TypeDescriptor's isSuperTypeOf can be 
used instead of getting the raw type and calling isAssignableFrom on that. Now 
if I'm not mistaken, the order of the arguments should be inversed since 
Solace.Record is the abstract supertype of any Solace.Record implementation.
   
    This can be rewritten as:
   ```suggestion
         boolean usingSolaceRecord = 
TypeDescriptor.of(Solace.Record.class).isSuperTypeOf(checkNotNull(input.getTypeDescriptor()));
   ```



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This will receive all the publishing results asynchronously, from the 
callbacks done by Solace
+ * when the ack of publishing a persistent message is received. This is then 
used by the finish
+ * bundle method of the writer to emit the corresponding results as the output 
of the write
+ * connector.
+ */
+@Internal
+public final class PublishResultsReceiver {

Review Comment:
   This is likely not working as you intended.
   The singleton in this class receives the results for all publishers, meaning 
that any publish result may end up in the output PCollection of any 
SolaceIO.Write transform.
   My guess is you'd want a PublishResultReceiver per instance of the transform.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;
+    }
+
+    private ParDo.MultiOutput<KV<Integer, Solace.Record>, 
Solace.PublishResult> getWriterTransform(
+        SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+      ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult> 
writer =
+          ParDo.of(
+              getWriterType() == WriterType.STREAMING
+                  ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics())
+                  : new UnboundedBatchedSolaceWriter.WriterDoFn(
+                      destinationFn,
+                      checkNotNull(getSessionServiceFactory()),
+                      getDeliveryMode(),
+                      getDispatchMode(),
+                      getNumberOfClientsPerWorker(),
+                      getPublishLatencyMetrics()));
+
+      return writer.withOutputTags(FAILED_PUBLISH_TAG, 
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+    }
+
+    private SolaceOutput rewindow(
+        SolaceOutput solacePublishResult,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      PCollection<Solace.PublishResult> correct = 
solacePublishResult.getSuccessfulPublish();
+      PCollection<Solace.PublishResult> failed = 
solacePublishResult.getFailedPublish();
+
+      PCollection<Solace.PublishResult> correctWithWindow = null;
+      PCollection<Solace.PublishResult> failedWithWindow = null;
+
+      if (correct != null) {
+        correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow 
correct");
+      }
+
+      if (failed != null) {
+        failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow 
failed");
+      }
+
+      return SolaceOutput.in(
+          solacePublishResult.getPipeline(), failedWithWindow, 
correctWithWindow);
+    }
+
+    private static PCollection<Solace.PublishResult> applyOriginalWindow(
+        PCollection<Solace.PublishResult> pcoll,
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+        String label) {
+      Window<Solace.PublishResult> originalWindow = 
captureWindowDetails(strategy);
+
+      if (strategy.getMode() == 
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+        originalWindow = originalWindow.accumulatingFiredPanes();
+      } else {
+        originalWindow = originalWindow.discardingFiredPanes();
+      }
+
+      return pcoll.apply(label, originalWindow);
+    }
+
+    private static Window<Solace.PublishResult> captureWindowDetails(
+        WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+      return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+          .withAllowedLateness(strategy.getAllowedLateness())
+          .withOnTimeBehavior(strategy.getOnTimeBehavior())
+          .withTimestampCombiner(strategy.getTimestampCombiner())
+          .triggering(strategy.getTrigger());
+    }
+
+    /**
+     * Called before running the Pipeline to verify this transform is fully 
and correctly specified.
+     */
+    private void validateWriteTransform(boolean usingSolaceRecords) {

Review Comment:
   Functions in Preconditions are generally imported using import static.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+

Review Comment:
   The sharding that's applied here couples Beam's internal parallelism to the 
number of clients you intend to use (1 per transform instance per worker). 
Using GroupIntoBatches.withShardedKey() instead makes more sense, since Solace 
mentions their producer is thread safe for some producer patterns.
   
   On that topic, and feel free to correct me if I'm wrong, since this 
transform is producing session-dependent messages the producer may only be 
accessed by multiple threads for direct messages.
   The transform switches the producer index on a per bundle basis, but it does 
not exclusively claim the producer, so multiple threads can access a producer 
regardless of the delivery mode.
   To uncomplicate the thread safety conditions of the producer you could 
complicate access to the producer. By putting the producer on its own thread 
and exposing an intermediate channel for message passing (e.g. a concurrent 
queue) you can decouple Beam's concurrency from the number of producers (1 per 
distinct configuration). The producer thread polls the channel to receive 
sendable work from multiple producers and sets up callbacks per the message's 
configuration (callbacks should be per instance, see other comment).
   This carries minor overhead for session-dependent direct messages, but 
simplifies the rest of the setup. You could optionally use a Guava cache to 
expire and close producers after a period of inactivity. My remaining question 
here is why isn't this using session-independent messages?
   
   Back to GroupIntoBatches, it covers all of the state and timer stuff that's 
happening in the writer fn and applying it would simplify the writer a great 
deal. It looks like it might not even need state and timers at all.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
+import com.solacesystems.jcsmp.XMLMessageProducer;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+@Internal
+public class SolaceMessageProducer extends MessageProducer {
+
+  private final XMLMessageProducer producer;
+  private final RetryCallableManager retryCallableManager = 
RetryCallableManager.create();
+
+  public SolaceMessageProducer(XMLMessageProducer producer) {
+    this.producer = producer;
+  }
+
+  @Override
+  public void publishSingleMessage(
+      Solace.Record record,
+      Destination topicOrQueue,
+      boolean useCorrelationKeyLatency,
+      DeliveryMode deliveryMode) {
+    BytesXMLMessage msg = createBytesXMLMessage(record, 
useCorrelationKeyLatency, deliveryMode);
+    Callable<Integer> publish =
+        () -> {
+          producer.send(msg, topicOrQueue);
+          return 0;
+        };
+
+    retryCallableManager.retryCallable(publish, 
ImmutableSet.of(JCSMPException.class));

Review Comment:
   There's also an existing dependency on Failsafe 
(https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably 
recommend using.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+
+      PCollection<Solace.Record> withGlobalWindow =
+          records.apply("Global window", Window.into(new GlobalWindows()));
+
+      PCollection<KV<Integer, Solace.Record>> withShardKeys =
+          withGlobalWindow.apply(
+              "Add shard key",
+              ParDo.of(new 
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+      String label =
+          getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : 
"Publish (batched)";
+
+      PCollectionTuple solaceOutput = withShardKeys.apply(label, 
getWriterTransform(destinationFn));
+
+      SolaceOutput output;
+      if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+        PCollection<Solace.PublishResult> failedPublish = 
solaceOutput.get(FAILED_PUBLISH_TAG);
+        PCollection<Solace.PublishResult> successfulPublish =
+            solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+        output =
+            rewindow(
+                SolaceOutput.in(input.getPipeline(), failedPublish, 
successfulPublish),
+                windowingStrategy);
+      } else {
+        LOG.info(
+            String.format(
+                "Solace.Write: omitting writer output because delivery mode is 
%s",
+                getDeliveryMode()));
+        output = SolaceOutput.in(input.getPipeline(), null, null);
+      }
+
+      return output;

Review Comment:
   Do both of these need to be the same output type? PublishResult contains a 
boolean for success or failure, metrics and an error message. It seems to me 
like you would want to provide the user a PCollection<Solace.Record> for 
successses, PCollection<DlqErrorT> for failures and PCollection<SolaceStatsT> 
for metrics.
   See the note about using the new DLQ pattern as well.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
 
     @Override
     public SolaceOutput expand(PCollection<T> input) {
-      // TODO: will be sent in upcoming PR
-      return SolaceOutput.in(input.getPipeline(), null, null);
+      Class<? super T> pcollClass = 
checkNotNull(input.getTypeDescriptor()).getRawType();
+      boolean usingSolaceRecord =
+          pcollClass
+                  .getTypeName()
+                  
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+              || pcollClass.isAssignableFrom(Solace.Record.class);
+
+      validateWriteTransform(usingSolaceRecord);
+
+      boolean usingDynamicDestinations = getDestination() == null;
+      SerializableFunction<Solace.Record, Destination> destinationFn;
+      if (usingDynamicDestinations) {
+        destinationFn = x -> 
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+      } else {
+        // Constant destination for all messages (same topic or queue)
+        // This should not be non-null, as nulls would have been flagged by the
+        // validateWriteTransform method
+        destinationFn = x -> checkNotNull(getDestination());
+      }
+
+      @SuppressWarnings("unchecked")
+      PCollection<Solace.Record> records =
+          getFormatFunction() == null
+              ? (PCollection<Solace.Record>) input
+              : input.apply(
+                  "Format records",
+                  MapElements.into(TypeDescriptor.of(Solace.Record.class))
+                      .via(checkNotNull(getFormatFunction())));
+
+      // Store the current window used by the input
+      PCollection<Solace.PublishResult> captureWindow =
+          records.apply(
+              "Capture window", ParDo.of(new 
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy 
=
+          (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+              captureWindow.getWindowingStrategy();
+

Review Comment:
   I'm not sure if transplanting the windowing strategy is a valid operation. 
Given that most connectors do not preserve the input windowing strategy it 
might not be expected behavior either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to