reuvenlax commented on code in PR #23615:
URL: https://github.com/apache/beam/pull/23615#discussion_r1002750420


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
                 TableRow.class)));
   }
 
+  @Test
+  public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+    if (!useStreaming || !useStorageApi) {
+      return;
+    }
+    // Side inputs only update in between bundles. So we set 
schemaUpdateRetries to 1, to ensure
+    // that the conversion
+    // step gives up when the side input is disabled. We then have to disable 
the failed-rows
+    // collection (dead letter)
+    // to ensure that the bundle actually fails and is retried.
+    BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+    bqOptions.setSchemaUpdateRetries(1);
+    bqOptions.setDisableStorageApiFailedRowsCollection(true);
+
+    BigQueryIO.Write.Method method =
+        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : 
Method.STORAGE_WRITE_API;
+    p.enableAbandonedNodeEnforcement(false);
+
+    final String tableSpec1 = "project-id:dataset-id.table1";
+    final String tableSpec2 = "project-id:dataset-id.table2";
+    final String tableSpec3 = "project-id:dataset-id.table3";
+    final TableReference tableRef1 = 
BigQueryHelpers.parseTableSpec(tableSpec1);
+    final TableReference tableRef2 = 
BigQueryHelpers.parseTableSpec(tableSpec2);
+    final TableReference tableRef3 = 
BigQueryHelpers.parseTableSpec(tableSpec3);
+
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("number").setType("INTEGER"),
+                    new 
TableFieldSchema().setName("tablespec").setType("STRING")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef1).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef2).setSchema(tableSchema));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef3).setSchema(tableSchema));
+
+    LongFunction<Iterable<TableRow>> getRow =
+        (LongFunction<Iterable<TableRow>> & Serializable)
+            (long i) -> {
+              TableRow tableRow;
+              if (i < 5) {
+                tableRow = new TableRow().set("name", "name" + 
i).set("number", Long.toString(i));
+              } else {
+                tableRow =
+                    new TableRow()
+                        .set("name", "name" + i)
+                        .set("number", Long.toString(i))
+                        .set("double_number", Long.toString(i * 2));
+              }
+              return ImmutableList.of(
+                  tableRow.clone().set("tablespec", tableSpec1),
+                  tableRow.clone().set("tablespec", tableSpec2),
+                  tableRow.clone().set("tablespec", tableSpec3));
+            };
+
+    SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> 
tableFunction =
+        value -> new TableDestination((String) 
value.getValue().get("tablespec"), "");
+
+    final int numRowsPerTable = 1;
+    TestStream.Builder<Long> testStream =
+        TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new 
Instant(0));
+    for (long i = 0; i < numRowsPerTable; ++i) {
+      testStream = testStream.addElements(i);
+    }
+    testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
+

Review Comment:
   oh good catch. I had reduced numRowsPerTable to 1 to debug something and 
forgot to increase it back. Also after fixing this the test fails - reason 
being that DirectRunner doesn't retry failed bundles :( Is there any way around 
this? I can't retry inside the transform, because then the side input won't 
update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to