[ https://issues.apache.org/jira/browse/BEAM-1267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336766#comment-16336766 ]
ASF GitHub Bot commented on BEAM-1267: -------------------------------------- aaltay closed pull request #1778: [BEAM-1267] Adds ignoreUnknownValues option to BigQuery.Write URL: https://github.com/apache/beam/pull/1778 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4b19973f1bb..066827707f1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1641,7 +1641,7 @@ public static Bound withoutValidation() { // An option to indicate if table validation is desired. Default is true. final boolean validate; - + @Nullable final Boolean ignoreUnknownValues; @Nullable private BigQueryServices bigQueryServices; private static class TranslateTableSpecFunction implements @@ -1673,14 +1673,15 @@ public Bound() { CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, true /* validate */, - null /* bigQueryServices */); + null /* bigQueryServices */, + false /* ignoreUnknownValues */); } private Bound(String name, @Nullable ValueProvider<String> jsonTableRef, @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, @Nullable ValueProvider<String> jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, - @Nullable BigQueryServices bigQueryServices) { + @Nullable BigQueryServices bigQueryServices, @Nullable Boolean ignoreUnknownValues) { super(name); this.jsonTableRef = jsonTableRef; this.tableRefFunction = tableRefFunction; @@ -1689,6 +1690,7 @@ private Bound(String name, @Nullable ValueProvider<String> jsonTableRef, this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); this.validate = validate; this.bigQueryServices = bigQueryServices; + this.ignoreUnknownValues = ignoreUnknownValues; } /** @@ -1730,7 +1732,7 @@ private Bound toTableRef(ValueProvider<TableReference> table) { return new Bound(name, NestedValueProvider.of(table, new TableRefToJson()), tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1759,7 +1761,7 @@ public Bound to( public Bound toTableReference( SerializableFunction<BoundedWindow, TableReference> tableRefFunction) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, bigQueryServices); + writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1771,7 +1773,7 @@ public Bound toTableReference( public Bound withSchema(TableSchema schema) { return new Bound(name, jsonTableRef, tableRefFunction, StaticValueProvider.of(toJsonString(schema)), - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1780,7 +1782,7 @@ public Bound withSchema(TableSchema schema) { public Bound withSchema(ValueProvider<TableSchema> schema) { return new Bound(name, jsonTableRef, tableRefFunction, NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1790,7 +1792,7 @@ public Bound withSchema(ValueProvider<TableSchema> schema) { */ public Bound withCreateDisposition(CreateDisposition createDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1800,7 +1802,7 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) { */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, - createDisposition, writeDisposition, validate, bigQueryServices); + createDisposition, writeDisposition, validate, bigQueryServices, ignoreUnknownValues); } /** @@ -1810,13 +1812,23 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) { */ public Bound withoutValidation() { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, false, bigQueryServices); + writeDisposition, false, bigQueryServices, ignoreUnknownValues); + } + + /** + * Returns a copy of this write transformation, but with ignoreUnknownValues set to true. + * + * <p>Does not modify this object. + */ + public Bound withIgnoreUnknownValues() { + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, bigQueryServices, true); } @VisibleForTesting Bound withTestServices(BigQueryServices testServices) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testServices); + writeDisposition, validate, testServices, ignoreUnknownValues); } private static void verifyTableEmpty( @@ -1919,7 +1931,8 @@ public PDone expand(PCollection<TableRow> input) { if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { return input.apply( new StreamWithDeDup(getTable(), tableRefFunction, - NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices)); + NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices, + ignoreUnknownValues)); } ValueProvider<TableReference> table = getTableWithDefaultProject(options); @@ -1976,7 +1989,8 @@ public PDone expand(PCollection<TableRow> input) { NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, WriteDisposition.WRITE_EMPTY, - CreateDisposition.CREATE_IF_NEEDED))); + CreateDisposition.CREATE_IF_NEEDED, + ignoreUnknownValues))); PCollectionView<Iterable<String>> tempTablesView = tempTables .apply("TempTablesView", View.<String>asIterable()); @@ -2001,7 +2015,8 @@ public PDone expand(PCollection<TableRow> input) { NestedValueProvider.of(table, new TableRefToJson()), jsonSchema, writeDisposition, - createDisposition))); + createDisposition, + ignoreUnknownValues))); return PDone.in(input.getPipeline()); } @@ -2138,6 +2153,11 @@ public boolean getValidate() { return validate; } + /** Returns {@code true} if ignoreUnknownValues is enabled. */ + public Boolean getIgnoreUnknownValues() { + return ignoreUnknownValues; + } + private BigQueryServices getBigQueryServices() { if (bigQueryServices == null) { bigQueryServices = new BigQueryServicesImpl(); @@ -2254,6 +2274,7 @@ public void processElement(ProcessContext c) throws Exception { private final ValueProvider<String> jsonSchema; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; + private final Boolean ignoreUnknownValues; public WriteTables( boolean singlePartition, @@ -2264,6 +2285,28 @@ public WriteTables( ValueProvider<String> jsonSchema, WriteDisposition writeDisposition, CreateDisposition createDisposition) { + this( + singlePartition, + bqServices, + jobIdToken, + tempFilePrefix, + jsonTableRef, + jsonSchema, + writeDisposition, + createDisposition, + false /* ignoreUnknownValues */); + } + + public WriteTables( + boolean singlePartition, + BigQueryServices bqServices, + String jobIdToken, + String tempFilePrefix, + ValueProvider<String> jsonTableRef, + ValueProvider<String> jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + Boolean ignoreUnknownValues) { this.singlePartition = singlePartition; this.bqServices = bqServices; this.jobIdToken = jobIdToken; @@ -2272,6 +2315,7 @@ public WriteTables( this.jsonSchema = jsonSchema; this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; + this.ignoreUnknownValues = ignoreUnknownValues; } @ProcessElement @@ -2291,7 +2335,8 @@ public void processElement(ProcessContext c) throws Exception { jsonSchema == null ? null : jsonSchema.get(), TableSchema.class), partition, writeDisposition, - createDisposition); + createDisposition, + ignoreUnknownValues); c.output(toJsonString(ref)); removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition); @@ -2304,14 +2349,16 @@ private void load( @Nullable TableSchema schema, List<String> gcsUris, WriteDisposition writeDisposition, - CreateDisposition createDisposition) throws InterruptedException, IOException { + CreateDisposition createDisposition, + Boolean ignoreUnknownValues) throws InterruptedException, IOException { JobConfigurationLoad loadConfig = new JobConfigurationLoad() .setDestinationTable(ref) .setSchema(schema) .setSourceUris(gcsUris) .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) - .setSourceFormat("NEWLINE_DELIMITED_JSON"); + .setSourceFormat("NEWLINE_DELIMITED_JSON") + .setIgnoreUnknownValues(ignoreUnknownValues); String projectId = ref.getProjectId(); for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { @@ -2564,6 +2611,8 @@ static void clearCreatedTables() { private final BigQueryServices bqServices; + private final Boolean ignoreUnknownValues; + /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ private transient Map<String, List<TableRow>> tableRows; @@ -2580,10 +2629,12 @@ static void clearCreatedTables() { createAggregator("ByteCount", Sum.ofLongs()); /** Constructor. */ - StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) { + StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices, + Boolean ignoreUnknownValues) { this.jsonTableSchema = NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); this.bqServices = checkNotNull(bqServices, "bqServices"); + this.ignoreUnknownValues = ignoreUnknownValues; } /** @@ -2671,7 +2722,7 @@ private void flushRows(TableReference tableReference, if (!tableRows.isEmpty()) { try { long totalBytes = bqServices.getDatasetService(options).insertAll( - tableReference, tableRows, uniqueIds); + tableReference, tableRows, uniqueIds, ignoreUnknownValues); byteCountAggregator.addValue(totalBytes); } catch (IOException e) { throw new RuntimeException(e); @@ -2898,16 +2949,19 @@ private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction; private final transient ValueProvider<TableSchema> tableSchema; private final BigQueryServices bqServices; + private final Boolean ignoreUnknownValues; /** Constructor. */ StreamWithDeDup(ValueProvider<TableReference> tableReference, SerializableFunction<BoundedWindow, TableReference> tableRefFunction, ValueProvider<TableSchema> tableSchema, - BigQueryServices bqServices) { + BigQueryServices bqServices, + Boolean ignoreUnknownValues) { this.tableReference = tableReference; this.tableRefFunction = tableRefFunction; this.tableSchema = tableSchema; this.bqServices = checkNotNull(bqServices, "bqServices"); + this.ignoreUnknownValues = ignoreUnknownValues; } @Override @@ -2938,7 +2992,7 @@ public PDone expand(PCollection<TableRow> input) { tagged .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of()) - .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices))); + .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices, ignoreUnknownValues))); // Note that the implementation to return PDone here breaks the // implicit assumption about the job execution order. If a user diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 7173996ba3c..74fc3cfada8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -166,7 +166,18 @@ void deleteDataset(String projectId, String datasetId) * * <p>Returns the total bytes count of {@link TableRow TableRows}. */ - long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + long insertAll( + TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + throws IOException, InterruptedException; + + /** + * Inserts {@link TableRow TableRows} with the specified insertIds if not null. + * + * <p>Returns the total bytes count of {@link TableRow TableRows}. + */ + long insertAll( + TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList, + Boolean ignoreUnknownValues) throws IOException, InterruptedException; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 2098148a3c7..5ece5f62641 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -657,7 +657,8 @@ public void deleteDataset(String projectId, String datasetId) @VisibleForTesting long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList, - BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException { + BackOff backoff, final Sleeper sleeper, Boolean ignoreUnknownValues) + throws IOException, InterruptedException { checkNotNull(ref, "ref"); if (executor == null) { this.executor = options.as(GcsOptions.class).getExecutorService(); @@ -699,6 +700,7 @@ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String || i == rowsToPublish.size() - 1) { TableDataInsertAllRequest content = new TableDataInsertAllRequest(); content.setRows(rows); + content.setIgnoreUnknownValues(ignoreUnknownValues); final Bigquery.Tabledata.InsertAll insert = client.tabledata() .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), @@ -794,8 +796,20 @@ public long insertAll( TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) throws IOException, InterruptedException { return insertAll( - ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); + ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT, + false); + } + + @Override + public long insertAll( + TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList, + Boolean ignoreUnknownValues) + throws IOException, InterruptedException { + return insertAll( + ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT, + ignoreUnknownValues); } + } private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 471b5e4d09c..6a119bf2745 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -627,7 +627,8 @@ public void deleteDataset(String projectId, String datasetId) @Override public long insertAll( - TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList, + Boolean ignoreUnknownValues) throws IOException, InterruptedException { synchronized (tables) { assertEquals(rowList.size(), insertIdList.size()); @@ -642,6 +643,13 @@ public long insertAll( return dataSize; } } + + @Override + public long insertAll( + TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList) + throws IOException, InterruptedException { + return insertAll(ref, rowList, insertIdList, false); + } } @Rule public final transient TestPipeline p = TestPipeline.create(); @@ -699,6 +707,12 @@ private void checkWriteObjectWithValidate( assertEquals(validate, bound.validate); } + private void checkWriteObjectWithIgnoreUnknownValues( + BigQueryIO.Write.Bound bound, + boolean ignoreUnknownValues) { + assertEquals(ignoreUnknownValues, bound.ignoreUnknownValues); + } + @Before public void setUp() throws IOException { MockitoAnnotations.initMocks(this); @@ -1349,6 +1363,15 @@ public void testBuildWriteWithoutValidation() { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false); } + @Test + public void testBuildWriteWithIgnoreUnknownValues() { + // This test just checks that using ignoreUnknownValues will not trigger object + // construction errors. + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withIgnoreUnknownValues(); + checkWriteObjectWithIgnoreUnknownValues(bound, true); + } + @Test public void testBuildWriteDefaultProject() { BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 1ce10f1519c..4542ed68c65 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -498,7 +498,7 @@ public void testInsertRetry() throws Exception { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper(), false); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); @@ -534,7 +534,7 @@ public void testInsertRetrySelectRows() throws Exception { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper(), false); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); @@ -575,7 +575,7 @@ public InputStream answer(InvocationOnMock invocation) throws Throwable { // Expect it to fail. try { - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper(), false); fail(); } catch (IOException e) { assertThat(e, instanceOf(IOException.class)); @@ -615,7 +615,7 @@ public void testInsertDoesNotRetry() throws Throwable { new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); try { - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper(), false); fail(); } catch (RuntimeException e) { verify(response, times(1)).getStatusCode(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > BigQueryIO.Write should support the ignoreUnknownValues option > -------------------------------------------------------------- > > Key: BEAM-1267 > URL: https://issues.apache.org/jira/browse/BEAM-1267 > Project: Beam > Issue Type: New Feature > Components: sdk-java-gcp > Reporter: George Agnelli > Priority: Major > Labels: newbie, starter > > Because it helps make schema evolution easier. -- This message was sent by Atlassian JIRA (v7.6.3#76005)