BenWhitehead commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r605265378



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import 
org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link 
PTransform}s for
+ * <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed 
via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link 
org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions 
PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC 
clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Operations</h3>
+ * <h4>Write</h4>
+ * To write a {@link PCollection} to Cloud Firestore use {@link 
FirestoreV1#write()}, picking the
+ * behavior of the writer.
+ *
+ * Writes use Cloud Firestore's BatchWrite api which provides fine grained 
write semantics.
+ *
+ * The default behavior is to fail a bundle if any single write fails with a 
non-retryable error.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PDone sink = writes
+ *     .apply(FirestoreIO.v1().write().batchWrite().build());
+ * }</pre>
+ *
+ * Alternatively, if you'd rather output write failures to a Dead Letter Queue 
add
+ * {@link BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue} when 
building your writer.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PCollection<WriteFailure> writeFailures = writes
+ *     
.apply(FirestoreIO.v1().write().batchWrite().withDeadLetterQueue().build());
+ * }</pre>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used 
to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code 
PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database";>Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud 
Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used 
for testing purposes
+ * by providing the host port information vi {@link 
FirestoreIOOptions#setEmulatorHost(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the 
Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1";>Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link 
PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed 
via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed 
via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported 
write operations
+   * available in the Firestore V1 API accessed through {@link 
com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an 
immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {
+    private static final Write INSTANCE = new Write();
+
+    private Write() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link 
com.google.firestore.v1.Write}
+     * operations.
+     * <p/>
+     * By default, when an error is encountered while trying to write to Cloud 
Firestore a {@link
+     * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+     * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+     * will output any failed write. {@link BatchWriteWithDeadLetterQueue} can 
be used by
+     * including {@link BatchWrite.Builder#withDeadLetterQueue()} when 
constructing the write handler.
+     * <p/>
+     * This method is part of the Firestore Connector DSL and should be 
accessed via {@link
+     * FirestoreIO#v1()}.
+     * <p/>
+     *
+     * All request quality-of-service for the built {@link BatchWrite} 
PTransform is scoped to
+     * the worker and configured based on the {@link RpcQosOptions} specified 
via this builder.
+     *
+     * @return A new type safe builder providing configuration for processing 
of {@link
+     * com.google.firestore.v1.Write}s
+     * @see FirestoreIO#v1()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+     */
+    public BatchWrite.Builder batchWrite() {
+      return new BatchWrite.Builder();
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link 
PCollection}{@code <}{@link
+   * com.google.firestore.v1.Write}{@code >, }{@link PDone}{@code >} which 
will write to Firestore.
+   * <p/>
+   * If an error is encountered while trying to write to Cloud Firestore a 
{@link
+   * FailedWritesException} will be thrown. If you would like a failed write 
to not result in a
+   * {@link FailedWritesException}, you can instead use {@link 
BatchWriteWithDeadLetterQueue} which
+   * will instead output any failed write. {@link 
BatchWriteWithDeadLetterQueue } can be used by
+   * including {@link Builder#withDeadLetterQueue()} when constructing the 
write handler.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe 
builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() 
write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is 
scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to 
maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined 
by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWriteWithDeadLetterQueue
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite";>google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest";>google.firestore.v1.BatchWriteRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer" 
href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse";>google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWrite
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PDone,
+      BatchWrite,
+      BatchWrite.Builder
+      > {
+
+    private BatchWrite(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, 
RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PDone expand(PCollection<com.google.firestore.v1.Write> input) {
+      input.apply("batchWrite", ParDo.of(new DefaultBatchWriteFn(clock, 
firestoreStatefulComponentFactory, rpcQosOptions, CounterFactory.DEFAULT)));

Review comment:
       Writes to firestore are generally idempotent, there are a few narrow 
cases where a precondition can be specified on an individual write (something 
like perform and update as long as the updatedAt matches timeX). If a 
precondition fails, firestire will respond with a failure status for that 
individual write, and if the dead letter queue is used will be output as an 
individual `WriteFailure{write, writeResult, status}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to