[ 
https://issues.apache.org/jira/browse/BEAM-8023?focusedWorklogId=299726&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-299726
 ]

ASF GitHub Bot logged work on BEAM-8023:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Aug/19 20:33
            Start Date: 22/Aug/19 20:33
    Worklog Time Spent: 10m 
      Work Description: jklukas commented on pull request #9405: [BEAM-8023] 
Add value provider interfaces for BigQueryIO.Read using Method.DIRECT_READ
URL: https://github.com/apache/beam/pull/9405#discussion_r316872643
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
 ##########
 @@ -465,6 +500,71 @@ public void 
testTableSourceInitialSplit_WithTableReadOptions() throws Throwable
         BigQueryStorageTableSource.create(
             ValueProvider.StaticValueProvider.of(tableRef),
             readOptions,
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, 
options);
+    assertEquals(10L, sources.size());
+  }
+
+  @Test
+  public void 
testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws 
Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(100L)
+            .setSchema(
+                new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new 
TableFieldSchema().setName("name").setType("STRING"),
+                            new 
TableFieldSchema().setName("number").setType("INTEGER"))));
+
+    fakeDatasetService.createTable(table);
+
+    TableReadOptions readOptions =
+        TableReadOptions.newBuilder()
+            .addSelectedFields("name")
+            .addSelectedFields("number")
+            .setRowRestriction("number > 5")
+            .build();
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
+            .setRequestedStreams(10)
+            .setReadOptions(readOptions)
+            // TODO(aryann): Once we rebuild the generated client code, we 
should change this to
+            // use setShardingStrategy().
+            .setUnknownFields(
+                UnknownFieldSet.newBuilder()
+                    .addField(7, 
UnknownFieldSet.Field.newBuilder().addVarint(2).build())
+                    .build())
+            .build();
+
+    ReadSession.Builder builder = ReadSession.newBuilder();
+    for (int i = 0; i < 10; i++) {
+      builder.addStreams(Stream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            StaticValueProvider.of(Lists.newArrayList("name", "number")),
+            StaticValueProvider.of("number > 5"),
 
 Review comment:
   Would it be more appropriate to use `p.newProvider` here rather than 
`StaticValueProvider.of` to catch potential misuses of the valueprovider before 
we hit runtime? If the StaticValueProvider style is already predominant in this 
file, I'm fine with keeping it as-is.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 299726)
    Time Spent: 1h  (was: 50m)

> Allow specifying BigQuery Storage API readOptions at runtime
> ------------------------------------------------------------
>
>                 Key: BEAM-8023
>                 URL: https://issues.apache.org/jira/browse/BEAM-8023
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Jeff Klukas
>            Assignee: Kenneth Jung
>            Priority: Minor
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> We have support in the Java SDK for using the BigQuery Storage API for reads, 
> but only the target query or table is supported as a ValueProvider to be 
> specified at runtime. AFAICT, there is no reason we can't delay specifying 
> readOptions until runtime as well.
> The readOptions are accessed by BigQueryStorageTableSource in getTargetTable; 
> I believe that's occurring at runtime, but I'd love for someone with deeper 
> BoundedSource knowledge to confirm that.
> I'd advocate for adding new methods 
> `TypedRead.withSelectedFields(ValueProvider<List<String>> value)` and 
> `TypedRead.withRowRestriction(ValueProvider<String> value)`. The existing 
> `withReadOptions` method would then populate the other two as 
> StaticValueProviders. Perhaps we'd want to deprecate `withReadOptions` in 
> favor or specifying individual read options as separate parameters.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to