BenWhitehead commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r627765515
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -94,106 +108,124 @@ void handleWriteFailures(ContextAdapter<WriteFailure>
context, Instant timestamp
/**
* {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
- * <p/>
- * Writes will be enqueued to be sent at a potentially
- * later time when more writes are available. This Fn attempts to maximize
throughput while
- * maintaining a high request success rate.
- * <p/>
- * All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
+ *
+ * <p>Writes will be enqueued to be sent at a potentially later time when
more writes are
+ * available. This Fn attempts to maximize throughput while maintaining a
high request success
+ * rate.
+ *
+ * <p>All request quality-of-service is managed via the instance of {@link
RpcQos} associated with
* the lifecycle of this Fn.
*/
- static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write,
Out> implements
- HasRpcAttemptContext {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+ abstract static class BaseBatchWriteFn<OutT> extends WindowAwareDoFn<Write,
OutT>
+ implements HasRpcAttemptContext {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
private final JodaClock clock;
private final FirestoreStatefulComponentFactory
firestoreStatefulComponentFactory;
private final RpcQosOptions rpcQosOptions;
+ private final CounterFactory counterFactory;
+ private final V1FnRpcAttemptContext rpcAttemptContext;
// transient running state information, not important to any possible
checkpointing
- private transient FirestoreRpc firestoreRpc;
+ // worker scoped state
private transient RpcQos rpcQos;
- private transient String projectId;
+ private transient Counter writesSuccessful;
+ private transient Counter writesFailedRetryable;
+ private transient Counter writesFailedNonRetryable;
+ // bundle scoped state
+ private transient FirestoreStub firestoreStub;
+ private transient DatabaseRootName databaseRootName;
+
@VisibleForTesting
transient Queue<@NonNull WriteElement> writes = new
PriorityQueue<>(WriteElement.COMPARATOR);
- @VisibleForTesting
- transient int queueNextEntryPriority = 0;
- @SuppressWarnings("initialization.fields.uninitialized") // allow
transient fields to be managed by component lifecycle
+ @VisibleForTesting transient int queueNextEntryPriority = 0;
+
+ @SuppressWarnings(
+ "initialization.fields.uninitialized") // allow transient fields to be
managed by component
+ // lifecycle
BaseBatchWriteFn(
JodaClock clock,
FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
- RpcQosOptions rpcQosOptions
- ) {
+ RpcQosOptions rpcQosOptions,
+ CounterFactory counterFactory) {
this.clock = clock;
this.firestoreStatefulComponentFactory =
firestoreStatefulComponentFactory;
this.rpcQosOptions = rpcQosOptions;
- }
-
- Logger getLogger() {
- return LOGGER;
+ this.counterFactory = counterFactory;
+ this.rpcAttemptContext = V1FnRpcAttemptContext.BatchWrite;
}
@Override
public Context getRpcAttemptContext() {
- return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+ return rpcAttemptContext;
}
@Override
- public final void
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull
DisplayData.Builder builder) {
- builder
- .include("rpcQosOptions", rpcQosOptions);
+ public final void populateDisplayData(
+ @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {
Review comment:
Left over codegen, will cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]