reuvenlax commented on code in PR #24145:
URL: https://github.com/apache/beam/pull/24145#discussion_r1081908125


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1851,6 +1859,197 @@ public TableRow apply(Long input) {
     p.run();
   }
 
+  @Test
+  public void testUpdateTableSchemaUseSet() throws Exception {
+    updateTableSchemaTest(true);
+  }
+
+  @Test
+  public void testUpdateTableSchemaUseSetF() throws Exception {
+    updateTableSchemaTest(false);
+  }
+
+  @SuppressWarnings({"unused"})
+  static class UpdateTableSchemaDoFn extends DoFn<KV<String, TableRow>, 
TableRow> {
+    @TimerId("updateTimer")
+    private final TimerSpec updateTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private final Duration timerOffset;
+    private final String updatedSchema;
+    private final FakeDatasetService fakeDatasetService;
+
+    UpdateTableSchemaDoFn(
+        Duration timerOffset, TableSchema updatedSchema, FakeDatasetService 
fakeDatasetService) {
+      this.timerOffset = timerOffset;
+      this.updatedSchema = BigQueryHelpers.toJsonString(updatedSchema);
+      this.fakeDatasetService = fakeDatasetService;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, TableRow> element,
+        @TimerId("updateTimer") Timer updateTimer,
+        OutputReceiver<TableRow> o)
+        throws IOException {
+      updateTimer.offset(timerOffset).setRelative();
+      o.output(element.getValue());
+    }
+
+    @OnTimer("updateTimer")
+    public void onTimer(@Key String tableSpec) throws IOException {
+      fakeDatasetService.updateTableSchema(
+          BigQueryHelpers.parseTableSpec(tableSpec),
+          BigQueryHelpers.fromJsonString(updatedSchema, TableSchema.class));
+    }
+  }
+
+  public void updateTableSchemaTest(boolean useSet) throws Exception {
+    assumeTrue(useStreaming);
+    assumeTrue(useStorageApi);
+
+    // Make sure that GroupIntoBatches does not buffer data.
+    
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(1);
+    p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(1);
+
+    BigQueryIO.Write.Method method =
+        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : 
Method.STORAGE_WRITE_API;
+    p.enableAbandonedNodeEnforcement(false);
+
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"),
+                    new 
TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
+    TableSchema tableSchemaUpdated =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("req").setType("STRING"),
+                    new 
TableFieldSchema().setName("double_number").setType("INTEGER")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef).setSchema(tableSchema));
+
+    LongFunction<TableRow> getRowSet =
+        (LongFunction<TableRow> & Serializable)
+            (long i) -> {
+              TableRow row =
+                  new TableRow()
+                      .set("name", "name" + i)
+                      .set("number", Long.toString(i))
+                      .set("double_number", Long.toString(i * 2));
+              if (i <= 5) {
+                row = row.set("req", "foo");
+              }
+              return row;
+            };
+
+    LongFunction<TableRow> getRowSetF =
+        (LongFunction<TableRow> & Serializable)
+            (long i) ->
+                new TableRow()
+                    .setF(
+                        ImmutableList.of(
+                            new TableCell().setV("name" + i),
+                            new TableCell().setV(Long.toString(i)),
+                            new TableCell().setV(i > 5 ? null : "foo"),
+                            new TableCell().setV(Long.toString(i * 2))));
+
+    LongFunction<TableRow> getRow = useSet ? getRowSet : getRowSetF;
+
+    TestStream.Builder<Long> testStream =
+        TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new 
Instant(0));
+    // These rows contain unknown fields, which should be dropped.
+    for (long i = 0; i < 5; i++) {
+      testStream = testStream.addElements(i);
+    }
+    // Expire the timer, which should update the schema.
+    testStream = 
testStream.advanceProcessingTime(Duration.standardSeconds(10));
+    // Add one element to trigger discovery of new schema.
+    testStream = testStream.addElements(5L);
+    testStream = 
testStream.advanceProcessingTime(Duration.standardSeconds(10));
+
+    // Now all fields should be known.
+    for (long i = 6; i < 10; i++) {
+      testStream = testStream.addElements(i);
+    }
+
+    PCollection<TableRow> tableRows =
+        p.apply(testStream.advanceWatermarkToInfinity())
+            .apply("getRow", 
MapElements.into(TypeDescriptor.of(TableRow.class)).via(getRow::apply))
+            .apply("add key", WithKeys.of("project-id:dataset-id.table"))
+            .apply(
+                "update schema",
+                ParDo.of(
+                    new UpdateTableSchemaDoFn(
+                        Duration.standardSeconds(5), tableSchemaUpdated, 
fakeDatasetService)))
+            .setCoder(TableRowJsonCoder.of());
+
+    tableRows.apply(
+        BigQueryIO.writeTableRows()
+            .to(tableRef)
+            .withMethod(method)
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+            .ignoreUnknownValues()
+            .withAutoSchemaUpdate(true)
+            .withTestServices(fakeBqServices)
+            .withoutValidation());
+
+    p.run();
+
+    Iterable<TableRow> expectedDroppedValues =
+        LongStream.range(0, 6)
+            .mapToObj(getRowSet)
+            .map(tr -> filterUnknownValues(tr, tableSchema.getFields()))
+            .collect(Collectors.toList());
+    Iterable<TableRow> expectedFullValues =
+        LongStream.range(6, 
10).mapToObj(getRowSet).collect(Collectors.toList());

Review Comment:
   (6,10) is open on the upper bound (i.e. it's [6, 10) )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to