reuvenlax commented on code in PR #23615:
URL: https://github.com/apache/beam/pull/23615#discussion_r1002750533
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1932,6 +1937,141 @@ public TableRow apply(Long input) {
TableRow.class)));
}
+ @Test
+ public void testUpdateTableSchemaPeriodicRefresh() throws Exception {
+ if (!useStreaming || !useStorageApi) {
+ return;
+ }
+ // Side inputs only update in between bundles. So we set
schemaUpdateRetries to 1, to ensure
+ // that the conversion
+ // step gives up when the side input is disabled. We then have to disable
the failed-rows
+ // collection (dead letter)
+ // to ensure that the bundle actually fails and is retried.
+ BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+ bqOptions.setSchemaUpdateRetries(1);
+ bqOptions.setDisableStorageApiFailedRowsCollection(true);
+
+ BigQueryIO.Write.Method method =
+ useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE :
Method.STORAGE_WRITE_API;
+ p.enableAbandonedNodeEnforcement(false);
+
+ final String tableSpec1 = "project-id:dataset-id.table1";
+ final String tableSpec2 = "project-id:dataset-id.table2";
+ final String tableSpec3 = "project-id:dataset-id.table3";
+ final TableReference tableRef1 =
BigQueryHelpers.parseTableSpec(tableSpec1);
+ final TableReference tableRef2 =
BigQueryHelpers.parseTableSpec(tableSpec2);
+ final TableReference tableRef3 =
BigQueryHelpers.parseTableSpec(tableSpec3);
+
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("number").setType("INTEGER"),
+ new
TableFieldSchema().setName("tablespec").setType("STRING")));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef1).setSchema(tableSchema));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef2).setSchema(tableSchema));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef3).setSchema(tableSchema));
+
+ LongFunction<Iterable<TableRow>> getRow =
+ (LongFunction<Iterable<TableRow>> & Serializable)
+ (long i) -> {
+ TableRow tableRow;
+ if (i < 5) {
+ tableRow = new TableRow().set("name", "name" +
i).set("number", Long.toString(i));
+ } else {
+ tableRow =
+ new TableRow()
+ .set("name", "name" + i)
+ .set("number", Long.toString(i))
+ .set("double_number", Long.toString(i * 2));
+ }
+ return ImmutableList.of(
+ tableRow.clone().set("tablespec", tableSpec1),
+ tableRow.clone().set("tablespec", tableSpec2),
+ tableRow.clone().set("tablespec", tableSpec3));
+ };
+
+ SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>
tableFunction =
+ value -> new TableDestination((String)
value.getValue().get("tablespec"), "");
+
+ final int numRowsPerTable = 1;
+ TestStream.Builder<Long> testStream =
+ TestStream.create(VarLongCoder.of()).advanceWatermarkTo(new
Instant(0));
+ for (long i = 0; i < numRowsPerTable; ++i) {
+ testStream = testStream.addElements(i);
+ }
+ testStream = testStream.advanceProcessingTime(Duration.standardSeconds(5));
+
+ PCollection<TableRow> tableRows =
+ p.apply(testStream.advanceWatermarkToInfinity())
+ .apply(
+ MapElements.via(
+ new SimpleFunction<Long, Iterable<TableRow>>() {
+ @Override
+ public Iterable<TableRow> apply(Long input) {
+ return getRow.apply(input);
+ }
+ }))
+ .setCoder(IterableCoder.of(TableRowJsonCoder.of()))
+ .apply(Flatten.iterables());
+ tableRows.apply(
+ BigQueryIO.writeTableRows()
+ .to(tableFunction)
+ .withMethod(method)
+
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+ .withSchemaRefreshFrequency(Duration.standardSeconds(2))
+ .withTestServices(fakeBqServices)
+ .withoutValidation());
+
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(5000);
Review Comment:
I don't see a way to do it with a special element. DirectRunner blocks on
element completion, so we'll never get to the special element. We could add a
transform that uses processing-time timers to update the schema, however that's
not really any different than using a Thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]