reuvenlax commented on a change in pull request #11767:
URL: https://github.com/apache/beam/pull/11767#discussion_r594872111



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFinalizeWritesDoFn.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import 
com.google.cloud.bigquery.storage.v1beta2.BatchCommitWriteStreamsResponse;
+import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This DoFn finalizes and commits Storage API streams. */
+class StorageApiFinalizeWritesDoFn extends DoFn<KV<String, String>, Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageApiFinalizeWritesDoFn.class);
+
+  private final Counter finalizeOperationsSent =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"finalizeOperationsSent");
+  private final Counter finalizeOperationsSucceeded =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"finalizeOperationsSucceeded");
+  private final Counter finalizeOperationsFailed =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"finalizeOperationsFailed");
+  private final Counter batchCommitOperationsSent =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"batchCommitOperationsSent");
+  private final Counter batchCommitOperationsSucceeded =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"batchCommitOperationsSucceeded");
+  private final Counter batchCommitOperationsFailed =
+      Metrics.counter(StorageApiFinalizeWritesDoFn.class, 
"batchCommitOperationsFailed");
+
+  private Map<String, Collection<String>> commitStreams;
+  private final BigQueryServices bqServices;
+  @Nullable private DatasetService datasetService;
+
+  public StorageApiFinalizeWritesDoFn(BigQueryServices bqServices) {
+    this.bqServices = bqServices;
+    this.commitStreams = Maps.newHashMap();
+    this.datasetService = null;
+  }
+
+  private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
+    if (datasetService == null) {
+      datasetService = 
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+    }
+    return datasetService;
+  }
+
+  @StartBundle
+  public void startBundle() throws IOException {
+    commitStreams = Maps.newHashMap();
+  }
+
+  @ProcessElement
+  @SuppressWarnings({"nullness"})
+  public void process(PipelineOptions pipelineOptions, @Element KV<String, 
String> element)
+      throws Exception {
+    String tableId = element.getKey();
+    String streamId = element.getValue();
+    DatasetService datasetService = getDatasetService(pipelineOptions);
+
+    RetryManager<FinalizeWriteStreamResponse, 
Context<FinalizeWriteStreamResponse>> retryManager =
+        new RetryManager<>(Duration.standardSeconds(1), 
Duration.standardMinutes(1), 3);
+    retryManager.addOperation(
+        c -> {
+          finalizeOperationsSent.inc();
+          return datasetService.finalizeWriteStream(streamId);
+        },
+        contexts -> {
+          LOG.error(
+              "Finalize of stream "
+                  + streamId
+                  + " failed with "
+                  + Iterables.getFirst(contexts, null).getError());
+          finalizeOperationsFailed.inc();
+          return RetryType.RETRY_ALL_OPERATIONS;
+        },
+        c -> {
+          LOG.info("Finalize of stream " + streamId + " finished with " + 
c.getResult());
+          finalizeOperationsSucceeded.inc();
+          commitStreams.computeIfAbsent(tableId, d -> 
Lists.newArrayList()).add(streamId);
+        },
+        new Context<>());
+    retryManager.run(true);
+  }
+
+  @FinishBundle
+  @SuppressWarnings({"nullness"})
+  public void finishBundle(PipelineOptions pipelineOptions) throws Exception {
+    DatasetService datasetService = getDatasetService(pipelineOptions);
+    for (Map.Entry<String, Collection<String>> entry : 
commitStreams.entrySet()) {
+      final String tableId = entry.getKey();
+      final Collection<String> streamNames = entry.getValue();
+      RetryManager<BatchCommitWriteStreamsResponse, 
Context<BatchCommitWriteStreamsResponse>>
+          retryManager =
+              new RetryManager<>(Duration.standardSeconds(1), 
Duration.standardMinutes(1), 3);
+      retryManager.addOperation(
+          c -> {
+            batchCommitOperationsSent.inc();

Review comment:
       Ok, retrying just the correct streams ended up not being difficult. PTAL

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1724,7 +1725,8 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
        * 
href="https://cloud.google.com/bigquery/streaming-data-into-bigquery";>Streaming 
Data into
        * BigQuery</a>.
        */
-      STREAMING_INSERTS
+      STREAMING_INSERTS,
+      STORAGE_API_WRITES

Review comment:
       Experimental is generally used for Beam-level items, so it seems a bit 
strange here. I'll add a comment instead (BTW, the experimental nature is why I 
didn't add example usage to the Javadoc).

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1664,6 +1664,7 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
         .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
         .setSchemaUpdateOptions(Collections.emptySet())
         .setNumFileShards(0)
+        .setNumStorageApiStreams(0)

Review comment:
       It's mostly because AutoValue will complain if it's not set (e.g. if not 
using the storage api)

##########
File path: 
examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryStorageAPIStreamingIT.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.auto.value.AutoValue;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BigQueryStorageAPIStreamingIT {
+  @DefaultSchema(AutoValueSchema.class)
+  @AutoValue
+  public abstract static class Value {
+    public abstract long getNumber();
+
+    @Nullable
+    public abstract ByteBuffer getPayload();
+  }
+
+  public interface Options extends TestPipelineOptions {
+    @Description("The number of records per second to generate.")
+    @Default.Integer(10000)
+    Integer getRecordsPerSecond();
+
+    void setRecordsPerSecond(Integer recordsPerSecond);
+
+    @Description("The size of the records to write in bytes.")
+    @Default.Integer(1024)
+    Integer getPayloadSizeBytes();
+
+    void setPayloadSizeBytes(Integer payloadSizeBytes);
+
+    @Description("Parallelism used for Storage API writes.")
+    @Default.Integer(5)
+    Integer getNumShards();
+
+    void setNumShards(Integer numShards);
+
+    @Description("Frequency to trigger appends. Each shard triggers 
independently.")
+    @Default.Integer(5)
+    Integer getTriggerFrequencySec();
+
+    void setTriggerFrequencySec(Integer triggerFrequencySec);
+
+    @Description("The table to write to.")
+    String getTargetTable();
+
+    void setTargetTable(String table);
+  }
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(Options.class);
+  }
+
+  @Test
+  public void testStorageAPIStreaming() throws Exception {
+    Options options = TestPipeline.testingPipelineOptions().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+    final int payloadSizeBytes = options.getPayloadSizeBytes();
+
+    // Generate input.
+    PCollection<Value> values =
+        //   p.apply(GenerateSequence.from(1).to(1000000L))

Review comment:
       Fixed

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -1771,6 +1773,8 @@ static String getExtractDestinationUri(String 
extractDestinationDir) {
 
     abstract int getNumFileShards();
 
+    abstract int getNumStorageApiStreams();

Review comment:
       The full name is the Storage Write API. Updated the public API to use 
that name

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
##########
@@ -78,4 +78,19 @@
   Integer getBqStreamingApiLoggingFrequencySec();
 
   void setBqStreamingApiLoggingFrequencySec(Integer value);
+
+  @Description("If set, then BigQueryIO.Write will default to using the 
Storage API.")
+  @Default.Boolean(false)
+  Boolean getUseStorageApiWrites();

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RetryManager.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
+import org.joda.time.Duration;
+
+/**
+ * Retry manager used by Storage API operations. This class manages a sequence 
of operations (e.g.
+ * sequential appends to a stream) and retries of those operations.
+ */
+class RetryManager<ResultT, ContextT extends Context<ResultT>> {
+  private Queue<Operation<ResultT, ContextT>> operations;
+  private final BackOff backoff;
+  private final ExecutorService executor;
+
+  enum RetryType {
+    DONT_RETRY,
+    RETRY_ALL_OPERATIONS
+  };
+
+  RetryManager(Duration initialBackoff, Duration maxBackoff, int maxRetries) {
+    this.operations = Queues.newArrayDeque();
+    backoff =
+        FluentBackoff.DEFAULT
+            .withInitialBackoff(initialBackoff)
+            .withMaxBackoff(maxBackoff)
+            .withMaxRetries(maxRetries)
+            .backoff();
+    this.executor = Executors.newCachedThreadPool();
+  }
+
+  static class Operation<ResultT, ContextT extends Context<ResultT>> {
+    static class Context<ResultT> {
+      private @Nullable Throwable error = null;
+      private @Nullable ResultT result = null;
+
+      public void setError(@Nullable Throwable error) {
+        this.error = error;
+      }
+
+      public @Nullable Throwable getError() {
+        return error;
+      }
+
+      public void setResult(@Nullable ResultT result) {
+        this.result = result;
+      }
+
+      public @Nullable ResultT getResult() {
+        return result;
+      }
+    }
+
+    private final Function<ContextT, ApiFuture<ResultT>> runOperation;
+    private final Function<Iterable<ContextT>, RetryType> onError;
+    private final Consumer<ContextT> onSuccess;
+    @Nullable private ApiFuture<ResultT> future = null;
+    @Nullable private Callback<ResultT> callback = null;
+    @Nullable ContextT context = null;
+
+    public Operation(
+        Function<ContextT, ApiFuture<ResultT>> runOperation,
+        Function<Iterable<ContextT>, RetryType> onError,
+        Consumer<ContextT> onSuccess,
+        ContextT context) {
+      this.runOperation = runOperation;
+      this.onError = onError;
+      this.onSuccess = onSuccess;
+      this.context = context;
+    }
+
+    @SuppressWarnings({"nullness"})
+    void run(Executor executor) {
+      this.future = runOperation.apply(context);
+      this.callback = new Callback<>();
+      ApiFutures.addCallback(future, callback, executor);
+    }
+
+    @SuppressWarnings({"nullness"})
+    boolean await() throws Exception {
+      callback.await();
+      return callback.getFailed();
+    }
+  }
+
+  private static class Callback<ResultT> implements ApiFutureCallback<ResultT> 
{
+    private final CountDownLatch waiter;
+    @Nullable private Throwable failure = null;
+    boolean failed = false;
+
+    Callback() {
+      this.waiter = new CountDownLatch(1);
+    }
+
+    void await() throws InterruptedException {
+      waiter.await();
+    }
+
+    boolean await(long timeoutSec) throws InterruptedException {
+      return waiter.await(timeoutSec, TimeUnit.SECONDS);
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      synchronized (this) {
+        failure = t;
+        failed = true;
+      }
+      waiter.countDown();
+    }
+
+    @Override
+    public void onSuccess(ResultT result) {
+      synchronized (this) {
+        failure = null;
+      }
+      waiter.countDown();
+    }
+
+    @Nullable
+    Throwable getFailure() {
+      synchronized (this) {
+        return failure;
+      }
+    }
+
+    boolean getFailed() {
+      synchronized (this) {
+        return failed;
+      }
+    }
+  }
+
+  void addOperation(
+      Function<ContextT, ApiFuture<ResultT>> runOperation,
+      Function<Iterable<ContextT>, RetryType> onError,
+      Consumer<ContextT> onSuccess,
+      ContextT context)
+      throws Exception {
+    addOperation(new Operation<>(runOperation, onError, onSuccess, context));
+  }
+
+  void addAndRunOperation(
+      Function<ContextT, ApiFuture<ResultT>> runOperation,
+      Function<Iterable<ContextT>, RetryType> onError,
+      Consumer<ContextT> onSuccess,
+      ContextT context)
+      throws Exception {
+    addAndRunOperation(new Operation<>(runOperation, onError, onSuccess, 
context));
+  }
+
+  void addOperation(Operation<ResultT, ContextT> operation) {
+    operations.add(operation);
+  }
+
+  void addAndRunOperation(Operation<ResultT, ContextT> operation) {
+    operation.run(executor);
+    operations.add(operation);
+  }
+
+  void run(boolean await) throws Exception {
+    for (Operation<ResultT, ContextT> operation : operations) {
+      operation.run(executor);
+    }
+    if (await) {
+      await();
+    }
+  }
+
+  @SuppressWarnings({"nullness"})
+  void await() throws Exception {
+    while (!this.operations.isEmpty()) {
+      Operation<ResultT, ContextT> operation = this.operations.element();
+      boolean failed = operation.await();
+      if (failed) {
+        Throwable failure = operation.callback.getFailure();
+        operation.context.setError(failure);
+        RetryType retryType =
+            operation.onError.apply(
+                operations.stream().map(o -> 
o.context).collect(Collectors.toList()));
+        if (retryType != RetryType.DONT_RETRY) {
+          Preconditions.checkState(RetryType.RETRY_ALL_OPERATIONS == 
retryType);
+          if (!BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+            throw new RuntimeException(failure);
+          }
+          for (Operation<ResultT, ?> awaitOperation : operations) {
+            awaitOperation.await();
+          }
+          // Run all the operations again.
+          run(false);
+        } else {

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.bigquery.storage.v1beta2.FinalizeWriteStreamResponse;
+import com.google.cloud.bigquery.storage.v1beta2.FlushRowsResponse;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
+import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
+import 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This DoFn flushes and optionally (if requested) finalizes Storage API 
streams. */
+public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, 
Operation>, Void> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageApiFlushAndFinalizeDoFn.class);
+
+  private final BigQueryServices bqServices;
+  @Nullable private DatasetService datasetService = null;
+  private final Counter flushOperationsSent =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsSent");
+  private final Counter flushOperationsSucceeded =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsSucceeded");
+  private final Counter flushOperationsFailed =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsFailed");
+  private final Counter flushOperationsAlreadyExists =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsAlreadyExists");
+  private final Counter flushOperationsInvalidArgument =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsInvalidArgument");
+  private final Distribution flushLatencyDistribution =
+      Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationLatencyMs");
+  private final Counter finalizeOperationsSent =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsSent");
+  private final Counter finalizeOperationsSucceeded =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsSucceeded");
+  private final Counter finalizeOperationsFailed =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsFailed");
+
+  @DefaultSchema(JavaFieldSchema.class)
+  static class Operation implements Comparable<Operation>, Serializable {
+    final long flushOffset;
+    final boolean finalizeStream;
+
+    @SchemaCreate
+    public Operation(long flushOffset, boolean finalizeStream) {
+      this.flushOffset = flushOffset;
+      this.finalizeStream = finalizeStream;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Operation operation = (Operation) o;
+      return flushOffset == operation.flushOffset && finalizeStream == 
operation.finalizeStream;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(flushOffset, finalizeStream);
+    }
+
+    @Override
+    public int compareTo(Operation other) {
+      int compValue = Long.compare(this.flushOffset, other.flushOffset);
+      if (compValue == 0) {
+        compValue = Boolean.compare(this.finalizeStream, other.finalizeStream);
+      }
+      return compValue;
+    }
+  }
+
+  public StorageApiFlushAndFinalizeDoFn(BigQueryServices bqServices) {
+    this.bqServices = bqServices;
+  }
+
+  private DatasetService getDatasetService(PipelineOptions pipelineOptions) 
throws IOException {
+    if (datasetService == null) {
+      datasetService = 
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
+    }
+    return datasetService;
+  }
+
+  @SuppressWarnings({"nullness"})
+  @ProcessElement
+  public void process(PipelineOptions pipelineOptions, @Element KV<String, 
Operation> element)
+      throws Exception {
+    final String streamId = element.getKey();
+    final Operation operation = element.getValue();
+    final DatasetService datasetService = getDatasetService(pipelineOptions);
+    // Flush the stream. If the flush offset < 0, that means we only need to 
finalize.
+    long offset = operation.flushOffset;
+    if (offset >= 0) {
+      Instant now = Instant.now();
+      RetryManager<FlushRowsResponse, Context<FlushRowsResponse>> retryManager 
=
+          new RetryManager<>(Duration.standardSeconds(1), 
Duration.standardMinutes(1), 3);
+      retryManager.addOperation(
+          // runOperation
+          c -> {
+            try {
+              flushOperationsSent.inc();
+              return datasetService.flush(streamId, offset);
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+          },
+          // onError
+          contexts -> {
+            Throwable error = Iterables.getFirst(contexts, null).getError();
+            LOG.warn(
+                "Flush of stream " + streamId + " to offset " + offset + " 
failed with " + error);
+            flushOperationsFailed.inc();
+            if (error instanceof ApiException) {
+              Code statusCode = ((ApiException) 
error).getStatusCode().getCode();
+              if (statusCode.equals(Code.ALREADY_EXISTS)) {
+                flushOperationsAlreadyExists.inc();
+                // Implies that we have already flushed up to this point, so 
don't retry.
+                return RetryType.DONT_RETRY;
+              }
+              if (statusCode.equals(Code.INVALID_ARGUMENT)) {

Review comment:
       Agreed, I would like a better error code as per the TODO. However right 
now I don't see a better way of doing this.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** This {@link PTransform} manages loads into BigQuery using the Storage API. 
*/
+public class StorageApiLoads<DestinationT, ElementT>
+    extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StorageApiLoads.class);
+  static final int FILE_TRIGGERING_RECORD_COUNT = 100;
+
+  private final Coder<DestinationT> destinationCoder;
+  private final Coder<ElementT> elementCoder;
+  private final StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations;
+  private final CreateDisposition createDisposition;
+  private final String kmsKey;
+  private final Duration triggeringFrequency;
+  private final BigQueryServices bqServices;
+  private final int numShards;
+
+  public StorageApiLoads(
+      Coder<DestinationT> destinationCoder,
+      Coder<ElementT> elementCoder,
+      StorageApiDynamicDestinations<ElementT, DestinationT> 
dynamicDestinations,
+      CreateDisposition createDisposition,
+      String kmsKey,
+      Duration triggeringFrequency,
+      BigQueryServices bqServices,
+      int numShards) {
+    this.destinationCoder = destinationCoder;
+    this.elementCoder = elementCoder;
+    this.dynamicDestinations = dynamicDestinations;
+    this.createDisposition = createDisposition;
+    this.kmsKey = kmsKey;
+    this.triggeringFrequency = triggeringFrequency;
+    this.bqServices = bqServices;
+    this.numShards = numShards;
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
+    return triggeringFrequency != null ? expandTriggered(input) : 
expandUntriggered(input);
+  }
+
+  public WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> 
input) {

Review comment:
       I'm not sure what batching at the write DoFn you're referring to. 
Eventually I would like to support auto sharding using GroupIntoBatches, 
however this is not yet ready for use by this sink

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
##########
@@ -123,6 +123,13 @@ public String getTableSpec() {
     return tableSpec;
   }
 
+  public String getTableUrn() {

Review comment:
       done

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
##########
@@ -135,16 +135,36 @@
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;

Review comment:
       AFIACT all tests are run against the new path, except for the ones that 
don't make sense (e.g. the ones that test specific functionality that doesn't 
exist in the storage api path)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to