pabloem commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r961293340
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema()
throws Exception {
p.run();
}
+ @Test
+ public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table = new
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+ fakeDatasetService.createTable(table);
+
+ CreateReadSessionRequest expectedCreateReadSessionRequest =
+ CreateReadSessionRequest.newBuilder()
+ .setParent("projects/project-id")
+ .setReadSession(
+ ReadSession.newBuilder()
+
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setReadOptions(
+
ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+ .setDataFormat(DataFormat.AVRO))
+ .setMaxStreamCount(10)
+ .build();
+
+ ReadSession readSession =
+ ReadSession.newBuilder()
+ .setName("readSessionName")
+
.setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+ .addStreams(ReadStream.newBuilder().setName("streamName"))
+ .setDataFormat(DataFormat.AVRO)
+ .build();
+
+ ReadRowsRequest expectedReadRowsRequest =
+ ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+ List<GenericRecord> records =
+ Lists.newArrayList(
+ createRecord("A", TRIMMED_AVRO_SCHEMA),
+ createRecord("B", TRIMMED_AVRO_SCHEMA),
+ createRecord("C", TRIMMED_AVRO_SCHEMA),
+ createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+ List<ReadRowsResponse> readRowsResponses =
+ Lists.newArrayList(
+ createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0,
0.50),
+ createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5,
0.75));
+
+ StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
+ when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+ .thenReturn(readSession);
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+ .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+ PCollection<Row> output =
+ p.apply(
+ BigQueryIO.readTableRowsWithSchema()
+ .from("foo.com:project:dataset.table")
+ .withMethod(Method.DIRECT_READ)
+ .withSelectedFields(Lists.newArrayList("name"))
+ .withFormat(DataFormat.AVRO)
+ .withTestServices(
+ new FakeBigQueryServices()
+ .withDatasetService(fakeDatasetService)
+ .withStorageClient(fakeStorageClient)))
+ .apply(Convert.toRows());
+
+ org.apache.beam.sdk.schemas.Schema beamSchema =
+ org.apache.beam.sdk.schemas.Schema.of(
+ org.apache.beam.sdk.schemas.Schema.Field.of(
+ "name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING));
+ PAssert.that(output)
+ .containsInAnyOrder(
+ ImmutableList.of(
+ Row.withSchema(beamSchema).addValue("A").build(),
+ Row.withSchema(beamSchema).addValue("B").build(),
+ Row.withSchema(beamSchema).addValue("C").build(),
+ Row.withSchema(beamSchema).addValue("D").build()));
+
+ p.run();
Review Comment:
added
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema()
throws Exception {
p.run();
}
+ @Test
+ public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+ fakeDatasetService.createDataset("foo.com:project", "dataset", "", "",
null);
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+ Table table = new
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+ fakeDatasetService.createTable(table);
+
+ CreateReadSessionRequest expectedCreateReadSessionRequest =
+ CreateReadSessionRequest.newBuilder()
+ .setParent("projects/project-id")
+ .setReadSession(
+ ReadSession.newBuilder()
+
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+ .setReadOptions(
+
ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+ .setDataFormat(DataFormat.AVRO))
+ .setMaxStreamCount(10)
+ .build();
+
+ ReadSession readSession =
+ ReadSession.newBuilder()
+ .setName("readSessionName")
+
.setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+ .addStreams(ReadStream.newBuilder().setName("streamName"))
+ .setDataFormat(DataFormat.AVRO)
+ .build();
+
+ ReadRowsRequest expectedReadRowsRequest =
+ ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+ List<GenericRecord> records =
+ Lists.newArrayList(
+ createRecord("A", TRIMMED_AVRO_SCHEMA),
+ createRecord("B", TRIMMED_AVRO_SCHEMA),
+ createRecord("C", TRIMMED_AVRO_SCHEMA),
+ createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+ List<ReadRowsResponse> readRowsResponses =
+ Lists.newArrayList(
+ createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0,
0.50),
+ createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5,
0.75));
+
+ StorageClient fakeStorageClient = mock(StorageClient.class,
withSettings().serializable());
+ when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+ .thenReturn(readSession);
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+ .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+ PCollection<Row> output =
+ p.apply(
+ BigQueryIO.readTableRowsWithSchema()
+ .from("foo.com:project:dataset.table")
+ .withMethod(Method.DIRECT_READ)
+ .withSelectedFields(Lists.newArrayList("name"))
+ .withFormat(DataFormat.AVRO)
Review Comment:
yes, it does (I've tested it on the syndeo template... and I've added an
integration test)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]