sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1750945244
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ PCollection<Solace.PublishResult> failedPublish =
solaceOutput.get(FAILED_PUBLISH_TAG);
+ PCollection<Solace.PublishResult> successfulPublish =
+ solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+ output =
+ rewindow(
+ SolaceOutput.in(input.getPipeline(), failedPublish,
successfulPublish),
+ windowingStrategy);
+ } else {
+ LOG.info(
+ String.format(
+ "Solace.Write: omitting writer output because delivery mode is
%s",
+ getDeliveryMode()));
+ output = SolaceOutput.in(input.getPipeline(), null, null);
+ }
+
+ return output;
+ }
+
+ private ParDo.MultiOutput<KV<Integer, Solace.Record>,
Solace.PublishResult> getWriterTransform(
+ SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+ ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult>
writer =
+ ParDo.of(
+ getWriterType() == WriterType.STREAMING
+ ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics())
+ : new UnboundedBatchedSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics()));
+
+ return writer.withOutputTags(FAILED_PUBLISH_TAG,
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+ }
+
+ private SolaceOutput rewindow(
+ SolaceOutput solacePublishResult,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ PCollection<Solace.PublishResult> correct =
solacePublishResult.getSuccessfulPublish();
+ PCollection<Solace.PublishResult> failed =
solacePublishResult.getFailedPublish();
+
+ PCollection<Solace.PublishResult> correctWithWindow = null;
+ PCollection<Solace.PublishResult> failedWithWindow = null;
+
+ if (correct != null) {
+ correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow
correct");
+ }
+
+ if (failed != null) {
+ failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow
failed");
+ }
+
+ return SolaceOutput.in(
+ solacePublishResult.getPipeline(), failedWithWindow,
correctWithWindow);
+ }
+
+ private static PCollection<Solace.PublishResult> applyOriginalWindow(
+ PCollection<Solace.PublishResult> pcoll,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+ String label) {
+ Window<Solace.PublishResult> originalWindow =
captureWindowDetails(strategy);
+
+ if (strategy.getMode() ==
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+ originalWindow = originalWindow.accumulatingFiredPanes();
+ } else {
+ originalWindow = originalWindow.discardingFiredPanes();
+ }
+
+ return pcoll.apply(label, originalWindow);
+ }
+
+ private static Window<Solace.PublishResult> captureWindowDetails(
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+ .withAllowedLateness(strategy.getAllowedLateness())
+ .withOnTimeBehavior(strategy.getOnTimeBehavior())
+ .withTimestampCombiner(strategy.getTimestampCombiner())
+ .triggering(strategy.getTrigger());
+ }
+
+ /**
+ * Called before running the Pipeline to verify this transform is fully
and correctly specified.
+ */
+ private void validateWriteTransform(boolean usingSolaceRecords) {
+ if (!usingSolaceRecords) {
+ Preconditions.checkArgument(
+ getFormatFunction() != null,
+ "SolaceIO.Write: If you are not using Solace.Record as the input
type, you"
+ + " must set a format function using withFormatFunction().");
+ }
+
+ Preconditions.checkArgument(
+ getMaxNumOfUsedWorkers() > 0,
+ "SolaceIO.Write: The number of used workers must be positive.");
+ Preconditions.checkArgument(
+ getNumberOfClientsPerWorker() > 0,
+ "SolaceIO.Write: The number of clients per worker must be
positive.");
+ Preconditions.checkArgument(
+ getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() ==
DeliveryMode.PERSISTENT,
+ String.format(
+ "SolaceIO.Write: Delivery mode must be either DIRECT or
PERSISTENT. %s"
+ + " not supported",
+ getDeliveryMode()));
+ if (getPublishLatencyMetrics()) {
+ Preconditions.checkArgument(
+ getDeliveryMode() == DeliveryMode.PERSISTENT,
+ "SolaceIO.Write: Publish latency metrics can only be enabled for
PERSISTENT"
+ + " delivery mode.");
+ }
+ Preconditions.checkArgument(
+ getSessionServiceFactory() != null,
+ "SolaceIO: You need to pass a session service factory. For basic"
+ + " authentication, you can use
BasicAuthJcsmpSessionServiceFactory.");
Review Comment:
```suggestion
Preconditions.checkNotNull(
getSessionServiceFactory(),
"SolaceIO: You need to pass a session service factory. For basic"
+ " authentication, you can use
BasicAuthJcsmpSessionServiceFactory.");
```
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
Review Comment:
If you remove Write.withFormatFunction, then use usingSolaceRecord here
instead of getFormatFunction() == null.
Also, since you've already validated the write transform configuration and
you've just branched on getFormatFunction() == null there's no need to call
checkNotNull(getFormatFunction()) again.
If you find yourself fighting the null checker, then inline your transform
validation here in expand. Preconditions uses annotations to signal to the null
checker that it has checked the nullness property of some variable. Do make
sure to assign a fresh variable though, since repeated calls to the same method
could return different values, so the null checker will raise that as a
potential issue.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ PCollection<Solace.PublishResult> failedPublish =
solaceOutput.get(FAILED_PUBLISH_TAG);
+ PCollection<Solace.PublishResult> successfulPublish =
+ solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+ output =
+ rewindow(
+ SolaceOutput.in(input.getPipeline(), failedPublish,
successfulPublish),
+ windowingStrategy);
+ } else {
+ LOG.info(
+ String.format(
+ "Solace.Write: omitting writer output because delivery mode is
%s",
+ getDeliveryMode()));
+ output = SolaceOutput.in(input.getPipeline(), null, null);
+ }
+
+ return output;
+ }
+
+ private ParDo.MultiOutput<KV<Integer, Solace.Record>,
Solace.PublishResult> getWriterTransform(
+ SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+ ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult>
writer =
+ ParDo.of(
+ getWriterType() == WriterType.STREAMING
+ ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics())
+ : new UnboundedBatchedSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics()));
+
+ return writer.withOutputTags(FAILED_PUBLISH_TAG,
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+ }
+
+ private SolaceOutput rewindow(
+ SolaceOutput solacePublishResult,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ PCollection<Solace.PublishResult> correct =
solacePublishResult.getSuccessfulPublish();
+ PCollection<Solace.PublishResult> failed =
solacePublishResult.getFailedPublish();
+
+ PCollection<Solace.PublishResult> correctWithWindow = null;
+ PCollection<Solace.PublishResult> failedWithWindow = null;
+
+ if (correct != null) {
+ correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow
correct");
+ }
+
+ if (failed != null) {
+ failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow
failed");
+ }
+
+ return SolaceOutput.in(
+ solacePublishResult.getPipeline(), failedWithWindow,
correctWithWindow);
+ }
+
+ private static PCollection<Solace.PublishResult> applyOriginalWindow(
+ PCollection<Solace.PublishResult> pcoll,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+ String label) {
+ Window<Solace.PublishResult> originalWindow =
captureWindowDetails(strategy);
+
+ if (strategy.getMode() ==
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+ originalWindow = originalWindow.accumulatingFiredPanes();
+ } else {
+ originalWindow = originalWindow.discardingFiredPanes();
+ }
+
+ return pcoll.apply(label, originalWindow);
+ }
+
+ private static Window<Solace.PublishResult> captureWindowDetails(
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+ .withAllowedLateness(strategy.getAllowedLateness())
+ .withOnTimeBehavior(strategy.getOnTimeBehavior())
+ .withTimestampCombiner(strategy.getTimestampCombiner())
+ .triggering(strategy.getTrigger());
+ }
+
+ /**
+ * Called before running the Pipeline to verify this transform is fully
and correctly specified.
+ */
+ private void validateWriteTransform(boolean usingSolaceRecords) {
+ if (!usingSolaceRecords) {
+ Preconditions.checkArgument(
+ getFormatFunction() != null,
+ "SolaceIO.Write: If you are not using Solace.Record as the input
type, you"
+ + " must set a format function using withFormatFunction().");
+ }
Review Comment:
```suggestion
if (!usingSolaceRecords) {
Preconditions.checkNotNull(
getFormatFunction(),
"SolaceIO.Write: If you are not using Solace.Record as the input
type, you"
+ " must set a format function using withFormatFunction().");
}
```
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -961,6 +977,21 @@ public Write<T> withWriterType(WriterType writerType) {
return toBuilder().setWriterType(writerType).build();
}
+ /**
+ * Set the format function for your custom data type, and/or for dynamic
destinations.
+ *
+ * <p>If you are using a custom data class, this function should return a
{@link Solace.Record}
+ * corresponding to your custom data class instance.
+ *
+ * <p>If you are using this formatting function with dynamic destinations,
you must ensure that
+ * you set the right value in the destination value of the {@link
Solace.Record} messages.
+ *
+ * <p>In any other case, this format function is optional.
+ */
+ public Write<T> withFormatFunction(SerializableFunction<T, Solace.Record>
formatFunction) {
+ return toBuilder().setFormatFunction(formatFunction).build();
+ }
Review Comment:
This can be removed since the format function is provided as an argument to
SolaceIO.write().
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
Review Comment:
Comparing the typename to a fixed string makes this inflexible to refactors.
Even if it were restated as calling equals on two classes it wouldn't be
necessary since isAssignableFrom does not evaluate if the instance is a strict
supertype of the argument. Per the docs, TypeDescriptor's isSuperTypeOf can be
used instead of getting the raw type and calling isAssignableFrom on that. Now
if I'm not mistaken, the order of the arguments should be inversed since
Solace.Record is the abstract supertype of any Solace.Record implementation.
This can be rewritten as:
```suggestion
boolean usingSolaceRecord =
TypeDescriptor.of(Solace.Record.class).isSuperTypeOf(checkNotNull(input.getTypeDescriptor()));
```
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This will receive all the publishing results asynchronously, from the
callbacks done by Solace
+ * when the ack of publishing a persistent message is received. This is then
used by the finish
+ * bundle method of the writer to emit the corresponding results as the output
of the write
+ * connector.
+ */
+@Internal
+public final class PublishResultsReceiver {
Review Comment:
This is likely not working as you intended.
The singleton in this class receives the results for all publishers, meaning
that any publish result may end up in the output PCollection of any
SolaceIO.Write transform.
My guess is you'd want a PublishResultReceiver per instance of the transform.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ PCollection<Solace.PublishResult> failedPublish =
solaceOutput.get(FAILED_PUBLISH_TAG);
+ PCollection<Solace.PublishResult> successfulPublish =
+ solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+ output =
+ rewindow(
+ SolaceOutput.in(input.getPipeline(), failedPublish,
successfulPublish),
+ windowingStrategy);
+ } else {
+ LOG.info(
+ String.format(
+ "Solace.Write: omitting writer output because delivery mode is
%s",
+ getDeliveryMode()));
+ output = SolaceOutput.in(input.getPipeline(), null, null);
+ }
+
+ return output;
+ }
+
+ private ParDo.MultiOutput<KV<Integer, Solace.Record>,
Solace.PublishResult> getWriterTransform(
+ SerializableFunction<Solace.Record, Destination> destinationFn) {
+
+ ParDo.SingleOutput<KV<Integer, Solace.Record>, Solace.PublishResult>
writer =
+ ParDo.of(
+ getWriterType() == WriterType.STREAMING
+ ? new UnboundedStreamingSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics())
+ : new UnboundedBatchedSolaceWriter.WriterDoFn(
+ destinationFn,
+ checkNotNull(getSessionServiceFactory()),
+ getDeliveryMode(),
+ getDispatchMode(),
+ getNumberOfClientsPerWorker(),
+ getPublishLatencyMetrics()));
+
+ return writer.withOutputTags(FAILED_PUBLISH_TAG,
TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
+ }
+
+ private SolaceOutput rewindow(
+ SolaceOutput solacePublishResult,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ PCollection<Solace.PublishResult> correct =
solacePublishResult.getSuccessfulPublish();
+ PCollection<Solace.PublishResult> failed =
solacePublishResult.getFailedPublish();
+
+ PCollection<Solace.PublishResult> correctWithWindow = null;
+ PCollection<Solace.PublishResult> failedWithWindow = null;
+
+ if (correct != null) {
+ correctWithWindow = applyOriginalWindow(correct, strategy, "Rewindow
correct");
+ }
+
+ if (failed != null) {
+ failedWithWindow = applyOriginalWindow(failed, strategy, "Rewindow
failed");
+ }
+
+ return SolaceOutput.in(
+ solacePublishResult.getPipeline(), failedWithWindow,
correctWithWindow);
+ }
+
+ private static PCollection<Solace.PublishResult> applyOriginalWindow(
+ PCollection<Solace.PublishResult> pcoll,
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy,
+ String label) {
+ Window<Solace.PublishResult> originalWindow =
captureWindowDetails(strategy);
+
+ if (strategy.getMode() ==
WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
+ originalWindow = originalWindow.accumulatingFiredPanes();
+ } else {
+ originalWindow = originalWindow.discardingFiredPanes();
+ }
+
+ return pcoll.apply(label, originalWindow);
+ }
+
+ private static Window<Solace.PublishResult> captureWindowDetails(
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> strategy) {
+ return Window.<Solace.PublishResult>into(strategy.getWindowFn())
+ .withAllowedLateness(strategy.getAllowedLateness())
+ .withOnTimeBehavior(strategy.getOnTimeBehavior())
+ .withTimestampCombiner(strategy.getTimestampCombiner())
+ .triggering(strategy.getTrigger());
+ }
+
+ /**
+ * Called before running the Pipeline to verify this transform is fully
and correctly specified.
+ */
+ private void validateWriteTransform(boolean usingSolaceRecords) {
Review Comment:
Functions in Preconditions are generally imported using import static.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
Review Comment:
The sharding that's applied here couples Beam's internal parallelism to the
number of clients you intend to use (1 per transform instance per worker).
Using GroupIntoBatches.withShardedKey() instead makes more sense, since Solace
mentions their producer is thread safe for some producer patterns.
On that topic, and feel free to correct me if I'm wrong, since this
transform is producing session-dependent messages the producer may only be
accessed by multiple threads for direct messages.
The transform switches the producer index on a per bundle basis, but it does
not exclusively claim the producer, so multiple threads can access a producer
regardless of the delivery mode.
To uncomplicate the thread safety conditions of the producer you could
complicate access to the producer. By putting the producer on its own thread
and exposing an intermediate channel for message passing (e.g. a concurrent
queue) you can decouple Beam's concurrency from the number of producers (1 per
distinct configuration). The producer thread polls the channel to receive
sendable work from multiple producers and sets up callbacks per the message's
configuration (callbacks should be per instance, see other comment).
This carries minor overhead for session-dependent direct messages, but
simplifies the rest of the setup. You could optionally use a Guava cache to
expire and close producers after a period of inactivity. My remaining question
here is why isn't this using session-independent messages?
Back to GroupIntoBatches, it covers all of the state and timer stuff that's
happening in the writer fn and applying it would simplify the writer a great
deal. It looks like it might not even need state and timers at all.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SolaceMessageProducer.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.broker;
+
+import com.solacesystems.jcsmp.BytesXMLMessage;
+import com.solacesystems.jcsmp.DeliveryMode;
+import com.solacesystems.jcsmp.Destination;
+import com.solacesystems.jcsmp.JCSMPException;
+import com.solacesystems.jcsmp.JCSMPSendMultipleEntry;
+import com.solacesystems.jcsmp.XMLMessageProducer;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.RetryCallableManager;
+import org.apache.beam.sdk.io.solace.data.Solace;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+@Internal
+public class SolaceMessageProducer extends MessageProducer {
+
+ private final XMLMessageProducer producer;
+ private final RetryCallableManager retryCallableManager =
RetryCallableManager.create();
+
+ public SolaceMessageProducer(XMLMessageProducer producer) {
+ this.producer = producer;
+ }
+
+ @Override
+ public void publishSingleMessage(
+ Solace.Record record,
+ Destination topicOrQueue,
+ boolean useCorrelationKeyLatency,
+ DeliveryMode deliveryMode) {
+ BytesXMLMessage msg = createBytesXMLMessage(record,
useCorrelationKeyLatency, deliveryMode);
+ Callable<Integer> publish =
+ () -> {
+ producer.send(msg, topicOrQueue);
+ return 0;
+ };
+
+ retryCallableManager.retryCallable(publish,
ImmutableSet.of(JCSMPException.class));
Review Comment:
There's also an existing dependency on Failsafe
(https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably
recommend using.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ PCollection<Solace.PublishResult> failedPublish =
solaceOutput.get(FAILED_PUBLISH_TAG);
+ PCollection<Solace.PublishResult> successfulPublish =
+ solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+ output =
+ rewindow(
+ SolaceOutput.in(input.getPipeline(), failedPublish,
successfulPublish),
+ windowingStrategy);
+ } else {
+ LOG.info(
+ String.format(
+ "Solace.Write: omitting writer output because delivery mode is
%s",
+ getDeliveryMode()));
+ output = SolaceOutput.in(input.getPipeline(), null, null);
+ }
+
+ return output;
Review Comment:
Do both of these need to be the same output type? PublishResult contains a
boolean for success or failure, metrics and an error message. It seems to me
like you would want to provide the user a PCollection<Solace.Record> for
successses, PCollection<DlqErrorT> for failures and PCollection<SolaceStatsT>
for metrics.
See the note about using the new DLQ pattern as well.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
Review Comment:
I'm not sure if transplanting the windowing strategy is a valid operation.
Given that most connectors do not preserve the input windowing strategy it
might not be expected behavior either.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]