iht commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1760241993
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
+ PCollection<Solace.Record> withGlobalWindow =
+ records.apply("Global window", Window.into(new GlobalWindows()));
+
+ PCollection<KV<Integer, Solace.Record>> withShardKeys =
+ withGlobalWindow.apply(
+ "Add shard key",
+ ParDo.of(new
UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+
+ String label =
+ getWriterType() == WriterType.STREAMING ? "Publish (streaming)" :
"Publish (batched)";
+
+ PCollectionTuple solaceOutput = withShardKeys.apply(label,
getWriterTransform(destinationFn));
+
+ SolaceOutput output;
+ if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
+ PCollection<Solace.PublishResult> failedPublish =
solaceOutput.get(FAILED_PUBLISH_TAG);
+ PCollection<Solace.PublishResult> successfulPublish =
+ solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
+ output =
+ rewindow(
+ SolaceOutput.in(input.getPipeline(), failedPublish,
successfulPublish),
+ windowingStrategy);
+ } else {
+ LOG.info(
+ String.format(
+ "Solace.Write: omitting writer output because delivery mode is
%s",
+ getDeliveryMode()));
+ output = SolaceOutput.in(input.getPipeline(), null, null);
+ }
+
+ return output;
Review Comment:
The connector guarantees that if there is already a producer, it is reused.
If the producer is closed, a new one is created. The connector only closes
producers when a worker is destroyed.
The receiver that gets all the callbacks calls is the same for all the
producers (static variable, which I actually have to change), so even if there
is a new producer, as long as it is in the same worker, it will receive the
same callbacks.
The static variable approach is actually buggy, two instances of the Write
connector will use the same instance. I am changing it to a single instance
receiving the callbacks per instance of the Write transform. But the same
principle applies: all the callbacks are always received in the same object,
and the producers are always reused for as long as Solace permits (and closed
if the worker is destroyed).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]