sjvanrossum commented on code in PR #32060:
URL: https://github.com/apache/beam/pull/32060#discussion_r1837847486
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -970,12 +969,14 @@ public Write<T> withSubmissionMode(SubmissionMode
submissionMode) {
* <p>The Solace writer can either use the JCSMP modes in streaming or
batched.
*
* <p>In streaming mode, the publishing latency will be lower, but the
throughput will also be
- * lower.
+ * lower. todo validate the constant sec
Review Comment:
Fulfill or remove todo.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java:
##########
@@ -119,7 +122,7 @@ public MessageReceiver getReceiver() {
}
@Override
- public MessageProducer getProducer(SubmissionMode submissionMode) {
+ public MessageProducer getInitializeProducer(SubmissionMode submissionMode) {
Review Comment:
Rename to `getInitializedProducer`.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/UnboundedSolaceWriter.java:
##########
@@ -165,16 +173,27 @@ public void publishResults(BeamContextWrapper context) {
maxFailed = Math.max(maxFailed, latency);
}
}
-
if (result.getPublished()) {
context.output(
- SUCCESSFUL_PUBLISH_TAG, result, getCurrentBundleTimestamp(),
getCurrentBundleWindow());
+ SUCCESSFUL_PUBLISH_TAG, result, getCurrentBundleTimestamp(),
GlobalWindow.INSTANCE);
} else {
- context.output(
- FAILED_PUBLISH_TAG, result, getCurrentBundleTimestamp(),
getCurrentBundleWindow());
+ try {
+ BadRecord b =
+ BadRecord.fromExceptionInformation(
+ result,
+ null,
+ null,
+ result.getError() != null
+ ? checkNotNull(result.getError())
+ : "SolaceIO.Write: unknown error.");
Review Comment:
Create a new variable, assign it `result.getError()` and then perform this
null check against the variable instead. The reason why error prone keeps
complaining about it is because the two calls two `getError` may return two
different results, storing the return value is what you want here. The other
thing you could do is
`Optional.ofNullable(result.getError()).orElse("SolaceIO.Write: unknown
error.")`.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -1079,21 +1089,11 @@ public SolaceOutput expand(PCollection<T> input) {
MapElements.into(TypeDescriptor.of(Solace.Record.class))
.via(checkNotNull(getFormatFunction())));
- // Store the current window used by the input
- PCollection<Solace.PublishResult> captureWindow =
- records.apply("Capture window", ParDo.of(new
RecordToPublishResultDoFn()));
-
- @SuppressWarnings("unchecked")
- WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy
=
- (WindowingStrategy<Solace.PublishResult, BoundedWindow>)
- captureWindow.getWindowingStrategy();
-
PCollection<Solace.Record> withGlobalWindow =
records.apply("Global window", Window.into(new GlobalWindows()));
PCollection<KV<Integer, Solace.Record>> withShardKeys =
- withGlobalWindow.apply(
- "Add shard key", ParDo.of(new
AddShardKeyDoFn(getMaxNumOfUsedWorkers())));
+ withGlobalWindow.apply("Add shard key", ParDo.of(new
AddShardKeyDoFn(getNumShards())));
Review Comment:
Deferring GIB to another PR?
##########
sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIO.java:
##########
@@ -368,8 +368,8 @@ public static Write<Row> writeRows(String to, CSVFormat
csvFormat) {
*
* <pre>{@code
* // SomeDataClass is a data class configured for Beam to automatically
infer its Schema.
- * @DefaultSchema(AutoValueSchema.class)
- * @AutoValue
+ * {@literal @}DefaultSchema(AutoValueSchema.class)
+ * {@literal @}AutoValue
Review Comment:
Remove, not in scope for this PR.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/SolaceIO.java:
##########
@@ -998,7 +999,12 @@ public Write<T>
withSessionServiceFactory(SessionServiceFactory factory) {
return toBuilder().setSessionServiceFactory(factory).build();
}
- abstract int getMaxNumOfUsedWorkers();
+ /** todo docs */
+ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> errorHandler) {
+ return toBuilder().setErrorHandler(errorHandler).build();
+ }
Review Comment:
Fulfill the todo. Also, is there a place in this class where `get*` and
`with*` methods are grouped? This seems out of place to me.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/write/PublishResultsReceiver.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.solace.write;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.solace.data.Solace.PublishResult;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * This will receive all the publishing results asynchronously, from the
callbacks done by Solace
+ * when the ack of publishing a persistent message is received. This is then
used by the finish
+ * bundle method of the writer to emit the corresponding results as the output
of the write
+ * connector.
+ */
+@Internal
+public final class PublishResultsReceiver {
Review Comment:
SGTM, is there any need for this class at all if it's just a wrapper around
a queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]