iht commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1755176620
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1026,8 +1057,180 @@ abstract static class Builder<T> {
@Override
public SolaceOutput expand(PCollection<T> input) {
- // TODO: will be sent in upcoming PR
- return SolaceOutput.in(input.getPipeline(), null, null);
+ Class<? super T> pcollClass =
checkNotNull(input.getTypeDescriptor()).getRawType();
+ boolean usingSolaceRecord =
+ pcollClass
+ .getTypeName()
+
.equals("org.apache.beam.sdk.io.solace.data.AutoValue_Solace_Record")
+ || pcollClass.isAssignableFrom(Solace.Record.class);
+
+ validateWriteTransform(usingSolaceRecord);
+
+ boolean usingDynamicDestinations = getDestination() == null;
+ SerializableFunction<Solace.Record, Destination> destinationFn;
+ if (usingDynamicDestinations) {
+ destinationFn = x ->
SolaceIO.convertToJcsmpDestination(checkNotNull(x.getDestination()));
+ } else {
+ // Constant destination for all messages (same topic or queue)
+ // This should not be non-null, as nulls would have been flagged by the
+ // validateWriteTransform method
+ destinationFn = x -> checkNotNull(getDestination());
+ }
+
+ @SuppressWarnings("unchecked")
+ PCollection<Solace.Record> records =
+ getFormatFunction() == null
+ ? (PCollection<Solace.Record>) input
+ : input.apply(
+ "Format records",
+ MapElements.into(TypeDescriptor.of(Solace.Record.class))
+ .via(checkNotNull(getFormatFunction())));
+
+ // Store the current window used by the input
+ PCollection<Solace.PublishResult> captureWindow =
+ records.apply(
+ "Capture window", ParDo.of(new
UnboundedSolaceWriter.RecordToPublishResultDoFn()));
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
+ (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
+ captureWindow.getWindowingStrategy();
+
Review Comment:
Some connectors do. "Transplanting" should work because what I do here is
grabbing the settings of the input window, and apply the same window (window
with same settings) to the output. I need to apply a global window "internally"
in the connector to control for parallelization, I think it was just nice to
"respect" the input window and reapply another window with the same settings to
the output, instead of producing the output with the global window that is
applied internally (leaking an implementation detail to the user).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]