[ 
https://issues.apache.org/jira/browse/BEAM-1267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336766#comment-16336766
 ] 

ASF GitHub Bot commented on BEAM-1267:
--------------------------------------

aaltay closed pull request #1778: [BEAM-1267] Adds ignoreUnknownValues option 
to BigQuery.Write
URL: https://github.com/apache/beam/pull/1778
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 4b19973f1bb..066827707f1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1641,7 +1641,7 @@ public static Bound withoutValidation() {
 
       // An option to indicate if table validation is desired. Default is true.
       final boolean validate;
-
+      @Nullable final Boolean ignoreUnknownValues;
       @Nullable private BigQueryServices bigQueryServices;
 
       private static class TranslateTableSpecFunction implements
@@ -1673,14 +1673,15 @@ public Bound() {
             CreateDisposition.CREATE_IF_NEEDED,
             WriteDisposition.WRITE_EMPTY,
             true /* validate */,
-            null /* bigQueryServices */);
+            null /* bigQueryServices */,
+            false /* ignoreUnknownValues */);
       }
 
       private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
           @Nullable SerializableFunction<BoundedWindow, TableReference> 
tableRefFunction,
           @Nullable ValueProvider<String> jsonSchema,
           CreateDisposition createDisposition, WriteDisposition 
writeDisposition, boolean validate,
-          @Nullable BigQueryServices bigQueryServices) {
+          @Nullable BigQueryServices bigQueryServices, @Nullable Boolean 
ignoreUnknownValues) {
         super(name);
         this.jsonTableRef = jsonTableRef;
         this.tableRefFunction = tableRefFunction;
@@ -1689,6 +1690,7 @@ private Bound(String name, @Nullable 
ValueProvider<String> jsonTableRef,
         this.writeDisposition = checkNotNull(writeDisposition, 
"writeDisposition");
         this.validate = validate;
         this.bigQueryServices = bigQueryServices;
+        this.ignoreUnknownValues = ignoreUnknownValues;
       }
 
       /**
@@ -1730,7 +1732,7 @@ private Bound toTableRef(ValueProvider<TableReference> 
table) {
         return new Bound(name,
             NestedValueProvider.of(table, new TableRefToJson()),
             tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
+            writeDisposition, validate, bigQueryServices, ignoreUnknownValues);
       }
 
       /**
@@ -1759,7 +1761,7 @@ public Bound to(
       public Bound toTableReference(
           SerializableFunction<BoundedWindow, TableReference> 
tableRefFunction) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, 
createDisposition,
-            writeDisposition, validate, bigQueryServices);
+            writeDisposition, validate, bigQueryServices, ignoreUnknownValues);
       }
 
       /**
@@ -1771,7 +1773,7 @@ public Bound toTableReference(
       public Bound withSchema(TableSchema schema) {
         return new Bound(name, jsonTableRef, tableRefFunction,
             StaticValueProvider.of(toJsonString(schema)),
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, validate, bigQueryServices, 
ignoreUnknownValues);
       }
 
       /**
@@ -1780,7 +1782,7 @@ public Bound withSchema(TableSchema schema) {
       public Bound withSchema(ValueProvider<TableSchema> schema) {
         return new Bound(name, jsonTableRef, tableRefFunction,
             NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, validate, bigQueryServices, 
ignoreUnknownValues);
       }
 
       /**
@@ -1790,7 +1792,7 @@ public Bound withSchema(ValueProvider<TableSchema> 
schema) {
        */
       public Bound withCreateDisposition(CreateDisposition createDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, validate, bigQueryServices, 
ignoreUnknownValues);
       }
 
       /**
@@ -1800,7 +1802,7 @@ public Bound withCreateDisposition(CreateDisposition 
createDisposition) {
        */
       public Bound withWriteDisposition(WriteDisposition writeDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, validate, bigQueryServices);
+            createDisposition, writeDisposition, validate, bigQueryServices, 
ignoreUnknownValues);
       }
 
       /**
@@ -1810,13 +1812,23 @@ public Bound withWriteDisposition(WriteDisposition 
writeDisposition) {
        */
       public Bound withoutValidation() {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, 
createDisposition,
-            writeDisposition, false, bigQueryServices);
+            writeDisposition, false, bigQueryServices, ignoreUnknownValues);
+      }
+
+      /**
+       * Returns a copy of this write transformation, but with 
ignoreUnknownValues set to true.
+       *
+       * <p>Does not modify this object.
+       */
+      public Bound withIgnoreUnknownValues() {
+        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, 
createDisposition,
+            writeDisposition, validate, bigQueryServices, true);
       }
 
       @VisibleForTesting
       Bound withTestServices(BigQueryServices testServices) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, 
createDisposition,
-            writeDisposition, validate, testServices);
+            writeDisposition, validate, testServices, ignoreUnknownValues);
       }
 
       private static void verifyTableEmpty(
@@ -1919,7 +1931,8 @@ public PDone expand(PCollection<TableRow> input) {
         if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != 
null) {
           return input.apply(
               new StreamWithDeDup(getTable(), tableRefFunction,
-                  NestedValueProvider.of(jsonSchema, new 
JsonSchemaToTableSchema()), bqServices));
+                  NestedValueProvider.of(jsonSchema, new 
JsonSchemaToTableSchema()), bqServices,
+                  ignoreUnknownValues));
         }
 
         ValueProvider<TableReference> table = 
getTableWithDefaultProject(options);
@@ -1976,7 +1989,8 @@ public PDone expand(PCollection<TableRow> input) {
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 WriteDisposition.WRITE_EMPTY,
-                CreateDisposition.CREATE_IF_NEEDED)));
+                CreateDisposition.CREATE_IF_NEEDED,
+                ignoreUnknownValues)));
 
         PCollectionView<Iterable<String>> tempTablesView = tempTables
             .apply("TempTablesView", View.<String>asIterable());
@@ -2001,7 +2015,8 @@ public PDone expand(PCollection<TableRow> input) {
                 NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 writeDisposition,
-                createDisposition)));
+                createDisposition,
+                ignoreUnknownValues)));
 
         return PDone.in(input.getPipeline());
       }
@@ -2138,6 +2153,11 @@ public boolean getValidate() {
         return validate;
       }
 
+      /** Returns {@code true} if ignoreUnknownValues is enabled. */
+      public Boolean getIgnoreUnknownValues() {
+        return ignoreUnknownValues;
+      }
+
       private BigQueryServices getBigQueryServices() {
         if (bigQueryServices == null) {
           bigQueryServices = new BigQueryServicesImpl();
@@ -2254,6 +2274,7 @@ public void processElement(ProcessContext c) throws 
Exception {
       private final ValueProvider<String> jsonSchema;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
+      private final Boolean ignoreUnknownValues;
 
       public WriteTables(
           boolean singlePartition,
@@ -2264,6 +2285,28 @@ public WriteTables(
           ValueProvider<String> jsonSchema,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition) {
+        this(
+          singlePartition,
+          bqServices,
+          jobIdToken,
+          tempFilePrefix,
+          jsonTableRef,
+          jsonSchema,
+          writeDisposition,
+          createDisposition,
+          false /* ignoreUnknownValues */);
+      }
+
+      public WriteTables(
+        boolean singlePartition,
+        BigQueryServices bqServices,
+        String jobIdToken,
+        String tempFilePrefix,
+        ValueProvider<String> jsonTableRef,
+        ValueProvider<String> jsonSchema,
+        WriteDisposition writeDisposition,
+        CreateDisposition createDisposition,
+        Boolean ignoreUnknownValues) {
         this.singlePartition = singlePartition;
         this.bqServices = bqServices;
         this.jobIdToken = jobIdToken;
@@ -2272,6 +2315,7 @@ public WriteTables(
         this.jsonSchema = jsonSchema;
         this.writeDisposition = writeDisposition;
         this.createDisposition = createDisposition;
+        this.ignoreUnknownValues = ignoreUnknownValues;
       }
 
       @ProcessElement
@@ -2291,7 +2335,8 @@ public void processElement(ProcessContext c) throws 
Exception {
                 jsonSchema == null ? null : jsonSchema.get(), 
TableSchema.class),
             partition,
             writeDisposition,
-            createDisposition);
+            createDisposition,
+            ignoreUnknownValues);
         c.output(toJsonString(ref));
 
         removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, 
partition);
@@ -2304,14 +2349,16 @@ private void load(
           @Nullable TableSchema schema,
           List<String> gcsUris,
           WriteDisposition writeDisposition,
-          CreateDisposition createDisposition) throws InterruptedException, 
IOException {
+          CreateDisposition createDisposition,
+          Boolean ignoreUnknownValues) throws InterruptedException, 
IOException {
         JobConfigurationLoad loadConfig = new JobConfigurationLoad()
             .setDestinationTable(ref)
             .setSchema(schema)
             .setSourceUris(gcsUris)
             .setWriteDisposition(writeDisposition.name())
             .setCreateDisposition(createDisposition.name())
-            .setSourceFormat("NEWLINE_DELIMITED_JSON");
+            .setSourceFormat("NEWLINE_DELIMITED_JSON")
+            .setIgnoreUnknownValues(ignoreUnknownValues);
 
         String projectId = ref.getProjectId();
         for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
@@ -2564,6 +2611,8 @@ static void clearCreatedTables() {
 
     private final BigQueryServices bqServices;
 
+    private final Boolean ignoreUnknownValues;
+
     /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
     private transient Map<String, List<TableRow>> tableRows;
 
@@ -2580,10 +2629,12 @@ static void clearCreatedTables() {
         createAggregator("ByteCount", Sum.ofLongs());
 
     /** Constructor. */
-    StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices 
bqServices) {
+    StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices 
bqServices,
+        Boolean ignoreUnknownValues) {
       this.jsonTableSchema =
           NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
       this.bqServices = checkNotNull(bqServices, "bqServices");
+      this.ignoreUnknownValues = ignoreUnknownValues;
     }
 
     /**
@@ -2671,7 +2722,7 @@ private void flushRows(TableReference tableReference,
       if (!tableRows.isEmpty()) {
         try {
           long totalBytes = bqServices.getDatasetService(options).insertAll(
-              tableReference, tableRows, uniqueIds);
+              tableReference, tableRows, uniqueIds, ignoreUnknownValues);
           byteCountAggregator.addValue(totalBytes);
         } catch (IOException e) {
           throw new RuntimeException(e);
@@ -2898,16 +2949,19 @@ private String tableSpecFromWindow(BigQueryOptions 
options, BoundedWindow window
     private final SerializableFunction<BoundedWindow, TableReference> 
tableRefFunction;
     private final transient ValueProvider<TableSchema> tableSchema;
     private final BigQueryServices bqServices;
+    private final Boolean ignoreUnknownValues;
 
     /** Constructor. */
     StreamWithDeDup(ValueProvider<TableReference> tableReference,
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
         ValueProvider<TableSchema> tableSchema,
-        BigQueryServices bqServices) {
+        BigQueryServices bqServices,
+        Boolean ignoreUnknownValues) {
       this.tableReference = tableReference;
       this.tableRefFunction = tableRefFunction;
       this.tableSchema = tableSchema;
       this.bqServices = checkNotNull(bqServices, "bqServices");
+      this.ignoreUnknownValues = ignoreUnknownValues;
     }
 
     @Override
@@ -2938,7 +2992,7 @@ public PDone expand(PCollection<TableRow> input) {
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), 
TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices)));
+          .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices, 
ignoreUnknownValues)));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 7173996ba3c..74fc3cfada8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -166,7 +166,18 @@ void deleteDataset(String projectId, String datasetId)
      *
      * <p>Returns the total bytes count of {@link TableRow TableRows}.
      */
-    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable 
List<String> insertIdList)
+    long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
+        throws IOException, InterruptedException;
+
+    /**
+     * Inserts {@link TableRow TableRows} with the specified insertIds if not 
null.
+     *
+     * <p>Returns the total bytes count of {@link TableRow TableRows}.
+     */
+    long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList,
+        Boolean ignoreUnknownValues)
         throws IOException, InterruptedException;
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 2098148a3c7..5ece5f62641 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -657,7 +657,8 @@ public void deleteDataset(String projectId, String 
datasetId)
 
     @VisibleForTesting
     long insertAll(TableReference ref, List<TableRow> rowList, @Nullable 
List<String> insertIdList,
-        BackOff backoff, final Sleeper sleeper) throws IOException, 
InterruptedException {
+        BackOff backoff, final Sleeper sleeper, Boolean ignoreUnknownValues)
+        throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
         this.executor = options.as(GcsOptions.class).getExecutorService();
@@ -699,6 +700,7 @@ long insertAll(TableReference ref, List<TableRow> rowList, 
@Nullable List<String
               || i == rowsToPublish.size() - 1) {
             TableDataInsertAllRequest content = new 
TableDataInsertAllRequest();
             content.setRows(rows);
+            content.setIgnoreUnknownValues(ignoreUnknownValues);
 
             final Bigquery.Tabledata.InsertAll insert = client.tabledata()
                 .insertAll(ref.getProjectId(), ref.getDatasetId(), 
ref.getTableId(),
@@ -794,8 +796,20 @@ public long insertAll(
         TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
         throws IOException, InterruptedException {
       return insertAll(
-          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), 
Sleeper.DEFAULT);
+          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), 
Sleeper.DEFAULT,
+          false);
+    }
+
+    @Override
+    public long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList,
+        Boolean ignoreUnknownValues)
+        throws IOException, InterruptedException {
+      return insertAll(
+          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), 
Sleeper.DEFAULT,
+          ignoreUnknownValues);
     }
+
   }
 
   private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 471b5e4d09c..6a119bf2745 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -627,7 +627,8 @@ public void deleteDataset(String projectId, String 
datasetId)
 
     @Override
     public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList,
+        Boolean ignoreUnknownValues)
         throws IOException, InterruptedException {
       synchronized (tables) {
         assertEquals(rowList.size(), insertIdList.size());
@@ -642,6 +643,13 @@ public long insertAll(
         return dataSize;
       }
     }
+
+    @Override
+    public long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> 
insertIdList)
+        throws IOException, InterruptedException {
+      return insertAll(ref, rowList, insertIdList, false);
+    }
   }
 
   @Rule public final transient TestPipeline p = TestPipeline.create();
@@ -699,6 +707,12 @@ private void checkWriteObjectWithValidate(
     assertEquals(validate, bound.validate);
   }
 
+  private void checkWriteObjectWithIgnoreUnknownValues(
+      BigQueryIO.Write.Bound bound,
+      boolean ignoreUnknownValues) {
+    assertEquals(ignoreUnknownValues, bound.ignoreUnknownValues);
+  }
+
   @Before
   public void setUp() throws IOException {
     MockitoAnnotations.initMocks(this);
@@ -1349,6 +1363,15 @@ public void testBuildWriteWithoutValidation() {
         null, CreateDisposition.CREATE_IF_NEEDED, 
WriteDisposition.WRITE_EMPTY, false);
   }
 
+  @Test
+  public void testBuildWriteWithIgnoreUnknownValues() {
+    // This test just checks that using ignoreUnknownValues will not trigger 
object
+    // construction errors.
+    BigQueryIO.Write.Bound bound =
+        
BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withIgnoreUnknownValues();
+    checkWriteObjectWithIgnoreUnknownValues(bound, true);
+  }
+
   @Test
   public void testBuildWriteDefaultProject() {
     BigQueryIO.Write.Bound bound = 
BigQueryIO.Write.to("somedataset.sometable");
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 1ce10f1519c..4542ed68c65 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -498,7 +498,7 @@ public void testInsertRetry() throws Exception {
 
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper());
+    dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper(), false);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -534,7 +534,7 @@ public void testInsertRetrySelectRows() throws Exception {
 
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new 
MockSleeper());
+    dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new 
MockSleeper(), false);
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -575,7 +575,7 @@ public InputStream answer(InvocationOnMock invocation) 
throws Throwable {
 
     // Expect it to fail.
     try {
-      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper());
+      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper(), false);
       fail();
     } catch (IOException e) {
       assertThat(e, instanceOf(IOException.class));
@@ -615,7 +615,7 @@ public void testInsertDoesNotRetry() throws Throwable {
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
 
     try {
-      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper());
+      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new 
MockSleeper(), false);
       fail();
     } catch (RuntimeException e) {
       verify(response, times(1)).getStatusCode();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BigQueryIO.Write should support the ignoreUnknownValues option
> --------------------------------------------------------------
>
>                 Key: BEAM-1267
>                 URL: https://issues.apache.org/jira/browse/BEAM-1267
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-gcp
>            Reporter: George Agnelli
>            Priority: Major
>              Labels: newbie, starter
>
> Because it helps make schema evolution easier.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to