danthev commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r598997452



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java
##########
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.cloud.firestore.spi.v1.FirestoreRpc;
+import com.google.common.collect.ImmutableList;
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.BatchWriteResponse;
+import com.google.firestore.v1.Write;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.WindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.FailedWritesException;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.WriteFailure;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.Element;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcWriteAttempt.FlushBuffer;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.BackOffUtils;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each 
of the supported write
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1WriteFn {
+
+  static final class DefaultBatchWriteFn extends BaseBatchWriteFn<Void> {
+    DefaultBatchWriteFn(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<Void> context, Instant 
timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      throw new FailedWritesException(writeFailures.stream().map(w -> 
w.failure).collect(Collectors.toList()));
+    }
+  }
+
+  static final class BatchWriteFnWithDeadLetterQueue extends 
BaseBatchWriteFn<WriteFailure> {
+    BatchWriteFnWithDeadLetterQueue(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    protected void handleWriteFailures(ContextAdapter<WriteFailure> context, 
Instant timestamp,
+        List<WriteFailureDetails> writeFailures, Runnable logMessage) {
+      logMessage.run();
+      for (WriteFailureDetails details : writeFailures) {
+        context.output(details.failure, timestamp, details.window);
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchWriteRequest}s.
+   * <p/>
+   * Writes will be enqueued to be sent at a potentially
+   * later time when more writes are available. This Fn attempts to maximize 
throughput while
+   * maintaining a high request success rate.
+   * <p/>
+   * All request quality-of-service is managed via the instance of {@link 
RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static abstract class BaseBatchWriteFn<Out> extends WindowAwareDoFn<Write, 
Out> implements
+      HasRpcAttemptContext {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite.getNamespace());
+    private final JodaClock clock;
+    private final FirestoreStatefulComponentFactory 
firestoreStatefulComponentFactory;
+    private final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible 
checkpointing
+    private transient FirestoreRpc firestoreRpc;
+    private transient RpcQos rpcQos;
+    private transient String projectId;
+    @VisibleForTesting
+    transient Queue<@NonNull WriteElement> writes = new 
PriorityQueue<>(WriteElement.COMPARATOR);
+    @VisibleForTesting
+    transient int queueNextEntryPriority = 0;
+
+    @SuppressWarnings("initialization.fields.uninitialized") // allow 
transient fields to be managed by component lifecycle
+    protected BaseBatchWriteFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions
+    ) {
+      this.clock = clock;
+      this.firestoreStatefulComponentFactory = 
firestoreStatefulComponentFactory;
+      this.rpcQosOptions = rpcQosOptions;
+    }
+
+    protected Logger getLogger() {
+      return LOGGER;
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchWrite;
+    }
+
+    @Override
+    public final void 
populateDisplayData(@edu.umd.cs.findbugs.annotations.NonNull 
DisplayData.Builder builder) {
+      builder
+          .include("rpcQosOptions", rpcQosOptions);
+    }
+
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+      writes = new PriorityQueue<>(WriteElement.COMPARATOR);
+    }
+
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = 
c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId = requireNonNull(project, "project must be defined on 
GcpOptions of PipelineOptions");
+      firestoreRpc = 
firestoreStatefulComponentFactory.getFirestoreRpc(c.getPipelineOptions());
+    }
+
+    /**
+     * For each element extract and enqueue all writes from the commit. Then 
potentially flush any
+     * previously and currently enqueued writes.
+     * <p/>
+     * In order for writes to be enqueued the value of {@link 
BatchWriteRequest#getDatabase()} must
+     * match exactly with the database name this instance is configured for 
via the provided {@link
+     * org.apache.beam.sdk.options.PipelineOptions PipelineOptions}
+     * <p/>
+     * {@inheritDoc}
+     */
+    @Override
+    public void processElement(ProcessContext context, BoundedWindow window) 
throws Exception {
+      @SuppressWarnings("nullness") // for some reason requireNonNull thinks 
its parameter must be non-null...
+      Write write = requireNonNull(context.element(), "context.element() must 
be non null");
+      ProcessContextAdapter<Out> contextAdapter = new 
ProcessContextAdapter<>(context);
+      int serializedSize = write.getSerializedSize();
+      boolean tooLarge = rpcQos.bytesOverLimit(serializedSize);
+      if (tooLarge) {
+        String message = String.format(
+            "%s for document '%s' larger than configured max allowed bytes per 
batch",
+            getWriteType(write),
+            getName(write)
+        );
+        handleWriteFailures(contextAdapter, clock.instant(),
+            ImmutableList.of(new WriteFailureDetails(
+                new WriteFailure(
+                    write,
+                    WriteResult.newBuilder().build(),
+                    Status.newBuilder()
+                        .setCode(Code.INVALID_ARGUMENT.getNumber())
+                        .setMessage(message)
+                        .build()
+                ),
+                window
+            )),
+            () -> LOGGER.info(message)
+        );
+      } else {
+        writes.offer(new WriteElement(queueNextEntryPriority++, write, 
window));
+        flushBatch(/* finalFlush */ false, contextAdapter);
+      }
+    }
+
+    /**
+     * Attempt to flush any outstanding enqueued writes before cleaning up any 
bundle related state.
+     * {@inheritDoc}
+     */
+    @SuppressWarnings("nullness") // allow clearing transient fields
+    @Override
+    public void finishBundle(FinishBundleContext context) throws Exception {
+      try {
+        flushBatch(/* finalFlush */ true, new 
FinishBundleContextAdapter<>(context));
+      } finally {
+        projectId = null;
+        firestoreRpc.close();
+      }
+    }
+
+    /**
+     * Possibly flush enqueued writes to Firestore.
+     * <p/>
+     * This flush attempts to maximize throughput and success rate of RPCs. 
When a flush should
+     * happen and how many writes are included is determined and managed by 
the {@link RpcQos}
+     * instance of this class.
+     *
+     * @param finalFlush A boolean specifying if this call is from {@link 
#finishBundle(DoFn.FinishBundleContext)}. If

Review comment:
       What I was trying to say is that, semantically, I would describe in the 
Javadoc what a parameter does, with the "why" as a sidenote. So the added 
second sentence is good, and with the rename it matches what's in the first 
sentence.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to