This is an automated email from the ASF dual-hosted git repository. ahmedabualsaud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 989a2198a31 Allow large timestamp skew for at-least-once streaming (#29858) 989a2198a31 is described below commit 989a2198a3174a66cd733834383f3603de70d1fa Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com> AuthorDate: Wed Dec 27 12:16:53 2023 +0300 Allow large timestamp skew for at-least-once streaming (#29858) * large skew * test * use AppendSerializationError everywhere --- .../bigquery/StorageApiWriteUnshardedRecords.java | 11 ++- .../bigquery/StorageApiWritesShardedRecords.java | 6 +- .../sdk/io/gcp/testing/FakeDatasetService.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 109 +++++++++++++++++++++ .../io/gcp/bigquery/BigQuerySinkMetricsTest.java | 4 +- 5 files changed, 123 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8f24ebc8ad9..3c6c73dd021 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -670,10 +670,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); if (failedContext.getError() != null - && failedContext.getError() instanceof Exceptions.AppendSerializtionError) { - Exceptions.AppendSerializtionError error = + && failedContext.getError() instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError error = Preconditions.checkStateNotNull( - (Exceptions.AppendSerializtionError) failedContext.getError()); + (Exceptions.AppendSerializationError) failedContext.getError()); Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { @@ -1164,5 +1164,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> throw new RuntimeException(e); } } + + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 8cf8ad0ee02..0f9b07d0c40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -662,10 +662,10 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object // failedInserts // PCollection, and retry with the remaining rows. if (failedContext.getError() != null - && failedContext.getError() instanceof Exceptions.AppendSerializtionError) { - Exceptions.AppendSerializtionError error = + && failedContext.getError() instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError error = Preconditions.checkArgumentNotNull( - (Exceptions.AppendSerializtionError) failedContext.getError()); + (Exceptions.AppendSerializationError) failedContext.getError()); Set<Integer> failedRowIndices = error.getRowIndexToErrorMessage().keySet(); for (int failedIndex : failedRowIndices) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 6a50127acd8..1e746d7f96b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -680,7 +680,7 @@ public class FakeDatasetService implements DatasetService, WriteStreamService, S } if (!rowIndexToErrorMessage.isEmpty()) { return ApiFutures.immediateFailedFuture( - new Exceptions.AppendSerializtionError( + new Exceptions.AppendSerializationError( Code.INVALID_ARGUMENT.getNumber(), "Append serialization failed for writer: " + streamName, stream.streamName, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 720419f2227..55269342155 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -35,6 +35,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; @@ -48,7 +50,12 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions; +import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.protobuf.ByteString; +import com.google.protobuf.DescriptorProtos; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -124,14 +131,18 @@ import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.Window; @@ -3104,6 +3115,104 @@ public class BigQueryIOWriteTest implements Serializable { containsInAnyOrder(Iterables.toArray(rows, TableRow.class))); } + public static class ThrowingFakeDatasetServices extends FakeDatasetService { + @Override + public BigQueryServices.StreamAppendClient getStreamAppendClient( + String streamName, + DescriptorProtos.DescriptorProto descriptor, + boolean useConnectionPool, + AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) { + return new BigQueryServices.StreamAppendClient() { + @Override + public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) { + Map<Integer, String> errorMap = new HashMap<>(); + for (int i = 0; i < rows.getSerializedRowsCount(); i++) { + errorMap.put(i, "some serialization error"); + } + SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create(); + appendResult.setException( + new Exceptions.AppendSerializationError( + 404, "some description", "some stream", errorMap)); + return appendResult; + } + + @Override + public com.google.cloud.bigquery.storage.v1.@Nullable TableSchema getUpdatedSchema() { + return null; + } + + @Override + public void pin() {} + + @Override + public void unpin() {} + + @Override + public void close() {} + }; + } + } + + @Test + public void testStorageWriteReturnsAppendSerializationError() throws Exception { + assumeTrue(useStorageApi); + assumeTrue(useStreaming); + p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(5); + + TableSchema schema = + new TableSchema() + .setFields(Arrays.asList(new TableFieldSchema().setType("INTEGER").setName("long"))); + Table fakeTable = new Table(); + TableReference ref = + new TableReference() + .setProjectId("project-id") + .setDatasetId("dataset-id") + .setTableId("table-id"); + fakeTable.setSchema(schema); + fakeTable.setTableReference(ref); + + ThrowingFakeDatasetServices throwingService = new ThrowingFakeDatasetServices(); + throwingService.createTable(fakeTable); + + int numRows = 100; + + WriteResult res = + p.apply( + PeriodicImpulse.create() + .startAt(Instant.ofEpochMilli(0)) + .stopAfter(Duration.millis(numRows - 1)) + .withInterval(Duration.millis(1))) + .apply( + "Convert to longs", + MapElements.into(TypeDescriptor.of(TableRow.class)) + .via(instant -> new TableRow().set("long", instant.getMillis()))) + .apply( + BigQueryIO.writeTableRows() + .to(ref) + .withSchema(schema) + .withTestServices( + new FakeBigQueryServices() + .withDatasetService(throwingService) + .withJobService(fakeJobService))); + + PCollection<Integer> numErrors = + res.getFailedStorageApiInserts() + .apply( + "Count errors", + MapElements.into(TypeDescriptors.integers()) + .via(err -> err.getErrorMessage().equals("some serialization error") ? 1 : 0)) + .apply( + Window.<Integer>into(new GlobalWindows()) + .triggering(AfterWatermark.pastEndOfWindow()) + .discardingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply(Sum.integersGlobally()); + + PAssert.that(numErrors).containsInAnyOrder(numRows); + + p.run().waitUntilFinish(); + } + @Test public void testWriteProtos() throws Exception { BigQueryIO.Write.Method method = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java index d754927bdb3..9c6fae164fc 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java @@ -144,7 +144,7 @@ public class BigQuerySinkMetricsTest { int notFoundVal = Status.Code.NOT_FOUND.value(); Throwable grpcError = - new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null); + new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null); assertThat(BigQuerySinkMetrics.throwableToGRPCCodeString(grpcError), equalTo("NOT_FOUND")); } @@ -220,7 +220,7 @@ public class BigQuerySinkMetricsTest { c.setOperationEndTime(t1.plusMillis(5)); int notFoundVal = Status.Code.NOT_FOUND.value(); Throwable grpcError = - new Exceptions.AppendSerializtionError(notFoundVal, "Test Error", "Stream name", null); + new Exceptions.AppendSerializationError(notFoundVal, "Test Error", "Stream name", null); c.setError(grpcError); // Test disabled SupportMetricsDeletion