reuvenlax commented on code in PR #23615:
URL: https://github.com/apache/beam/pull/23615#discussion_r1002750420
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
TableRow.class)));
}
+ @Test
+ public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+ if (!useStreaming || !useStorageApi) {
+ return;
+ }
+ // Side inputs only update in between bundles. So we set
schemaUpdateRetries to 1, to ensure
+ // that the conversion
+ // step gives up when the side input is disabled. We then have to disable
the failed-rows
+ // collection (dead letter)
+ // to ensure that the bundle actually fails and is retried.
+ BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+ bqOptions.setSchemaUpdateRetries(1);
+ bqOptions.setDisableStorageApiFailedRowsCollection(true);
+
+ BigQueryIO.Write.Method method =
+ useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE :
Method.STORAGE_WRITE_API;
+ p.enableAbandonedNodeEnforcement(false);
+
+ final String tableSpec1 = "project-id:dataset-id.table1";
+ final String tableSpec2 = "project-id:dataset-id.table2";
+ final String tableSpec3 = "project-id:dataset-id.table3";
+ final TableReference tableRef1 =
BigQueryHelpers.parseTableSpec(tableSpec1);
+ final TableReference tableRef2 =
BigQueryHelpers.parseTableSpec(tableSpec2);
+ final TableReference tableRef3 =
BigQueryHelpers.parseTableSpec(tableSpec3);
+
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"),
+ new
TableFieldSchema().setName("tablespec").setType("STRING")));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef1).setSchema(tableSchema));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef2).setSchema(tableSchema));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef3).setSchema(tableSchema));
+
+ LongFunction<Iterable<TableRow>> getRow =
+ (LongFunction<Iterable<TableRow>> & Serializable)
+ (long i) -> {
+ TableRow tableRow;
+ if (i < 5) {
+ tableRow = new TableRow().set("name", "name" +
i).set("number", Long.toString(i));
+ } else {
+ tableRow =
+ new TableRow()
+ .set("name", "name" + i)
+ .set("number", Long.toString(i))
+ .set("double_number", Long.toString(i * 2));
+ }
+ return ImmutableList.of(
+ tableRow.clone().set("tablespec", tableSpec1),
+ tableRow.clone().set("tablespec", tableSpec2),
+ tableRow.clone().set("tablespec", tableSpec3));
+ };
+
+ SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>
tableFunction =
+ value -> new TableDestination((String)
value.getValue().get("tablespec"), "");
+
+ final int numRowsPerTable = 1;
+ TestStream.Builder<Long> testStream =
+ TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new
Instant(0));
+ for (long i = 0; i < numRowsPerTable; ++i) {
+ testStream = testStream.addElements(i);
+ }
+ testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
+
Review Comment:
oh good catch. I had reduced numRowsPerTable to 1 to debug something and
forgot to increase it back. Also after fixing this the test fails - reason
being that DirectRunner doesn't retry failed bundles :( Is there any way around
this? I can't retry inside the transform, because then the side input won't
update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]