[ https://issues.apache.org/jira/browse/BEAM-8023?focusedWorklogId=299726&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299726 ]
ASF GitHub Bot logged work on BEAM-8023: ---------------------------------------- Author: ASF GitHub Bot Created on: 22/Aug/19 20:33 Start Date: 22/Aug/19 20:33 Worklog Time Spent: 10m Work Description: jklukas commented on pull request #9405: [BEAM-8023] Add value provider interfaces for BigQueryIO.Read using Method.DIRECT_READ URL: https://github.com/apache/beam/pull/9405#discussion_r316872643 ########## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java ########## @@ -465,6 +500,71 @@ public void testTableSourceInitialSplit_WithTableReadOptions() throws Throwable BigQueryStorageTableSource.create( ValueProvider.StaticValueProvider.of(tableRef), readOptions, + null, + null, + new TableRowParser(), + TableRowJsonCoder.of(), + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient)); + + List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, options); + assertEquals(10L, sources.size()); + } + + @Test + public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws Exception { + fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); + + Table table = + new Table() + .setTableReference(tableRef) + .setNumBytes(100L) + .setSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))); + + fakeDatasetService.createTable(table); + + TableReadOptions readOptions = + TableReadOptions.newBuilder() + .addSelectedFields("name") + .addSelectedFields("number") + .setRowRestriction("number > 5") + .build(); + + CreateReadSessionRequest expectedRequest = + CreateReadSessionRequest.newBuilder() + .setParent("projects/project-id") + .setTableReference(BigQueryHelpers.toTableRefProto(tableRef)) + .setRequestedStreams(10) + .setReadOptions(readOptions) + // TODO(aryann): Once we rebuild the generated client code, we should change this to + // use setShardingStrategy(). + .setUnknownFields( + UnknownFieldSet.newBuilder() + .addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build()) + .build()) + .build(); + + ReadSession.Builder builder = ReadSession.newBuilder(); + for (int i = 0; i < 10; i++) { + builder.addStreams(Stream.newBuilder().setName("stream-" + i)); + } + + StorageClient fakeStorageClient = mock(StorageClient.class); + when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); + + BigQueryStorageTableSource<TableRow> tableSource = + BigQueryStorageTableSource.create( + ValueProvider.StaticValueProvider.of(tableRef), + null, + StaticValueProvider.of(Lists.newArrayList("name", "number")), + StaticValueProvider.of("number > 5"), Review comment: Would it be more appropriate to use `p.newProvider` here rather than `StaticValueProvider.of` to catch potential misuses of the valueprovider before we hit runtime? If the StaticValueProvider style is already predominant in this file, I'm fine with keeping it as-is. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 299726) Time Spent: 1h (was: 50m) > Allow specifying BigQuery Storage API readOptions at runtime > ------------------------------------------------------------ > > Key: BEAM-8023 > URL: https://issues.apache.org/jira/browse/BEAM-8023 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Reporter: Jeff Klukas > Assignee: Kenneth Jung > Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > We have support in the Java SDK for using the BigQuery Storage API for reads, > but only the target query or table is supported as a ValueProvider to be > specified at runtime. AFAICT, there is no reason we can't delay specifying > readOptions until runtime as well. > The readOptions are accessed by BigQueryStorageTableSource in getTargetTable; > I believe that's occurring at runtime, but I'd love for someone with deeper > BoundedSource knowledge to confirm that. > I'd advocate for adding new methods > `TypedRead.withSelectedFields(ValueProvider<List<String>> value)` and > `TypedRead.withRowRestriction(ValueProvider<String> value)`. The existing > `withReadOptions` method would then populate the other two as > StaticValueProviders. Perhaps we'd want to deprecate `withReadOptions` in > favor or specifying individual read options as separate parameters. -- This message was sent by Atlassian Jira (v8.3.2#803003)