mutianf commented on code in PR #24015:
URL: https://github.com/apache/beam/pull/24015#discussion_r1046418159


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java:
##########
@@ -535,43 +578,84 @@ public String toString() {
 
   @Override
   public Reader createReader(BigtableSource source) throws IOException {
-    BigtableSession session = new BigtableSession(options);
+    LOG.info(
+        "Creating a Reader for Bigtable with settings: " + 
veneeringSettings.getDataSettings());
+    BigtableDataClient client = 
BigtableDataClient.create(veneeringSettings.getDataSettings());
     if (source.getMaxBufferElementCount() != null) {
-      return BigtableSegmentReaderImpl.create(session, source);
+      return BigtableSegmentReaderImpl.create(
+          client,
+          veneeringSettings.getDataSettings(),
+          source,
+          veneeringSettings.getOperationTimeouts());
     } else {
-      return new BigtableReaderImpl(session, source);
+      return new BigtableReaderImpl(
+          client,
+          veneeringSettings.getDataSettings(),
+          source,
+          veneeringSettings.getOperationTimeouts());
     }
   }
 
+  // Support 2 bigtable-hbase features not directly available in veneer:
+  // - disabling timeouts - when timeouts are disabled, bigtable-hbase ignores 
user configured
+  //   timeouts and forces 6 minute deadlines per attempt for all RPCs except 
scans. This is
+  //   implemented by an interceptor. However the interceptor must be informed 
that this is a scan
+  // - per attempt deadlines - vener doesn't implement deadlines for attempts. 
To workaround this,
+  //   the timeouts are set per call in the ApiCallContext. However this 
creates a separate issue of
+  //   over running the operation deadline, so gRPC deadline is also set.
+  private static GrpcCallContext createScanCallContext(
+      BigtableIOOperationTimeouts operationTimeouts) {
+    GrpcCallContext ctx = GrpcCallContext.createDefault();
+
+    if (!operationTimeouts.getUseTimeouts()) {
+      ctx =
+          ctx.withCallOptions(
+              CallOptions.DEFAULT.withOption(
+                  
BigtableHBaseVeneeringSettings.NoTimeoutsInterceptor.SKIP_DEFAULT_ATTEMPT_TIMEOUT,
+                  true));
+    } else {
+      if 
(operationTimeouts.getBulkReadRowsTimeouts().getOperationTimeout().isPresent()) 
{
+        ctx.withCallOptions(

Review Comment:
   Good catch! Thanks :D 



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java:
##########
@@ -1657,26 +1657,25 @@ void setupSampleRowKeys(String tableId, int numSamples, 
long bytesPerRow) {
       checkArgument(numSamples > 0, "Number of samples must be positive: %s", 
numSamples);
       checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", 
bytesPerRow);
 
-      ImmutableList.Builder<SampleRowKeysResponse> ret = 
ImmutableList.builder();
+      ImmutableList.Builder<KeyOffset> ret = ImmutableList.builder();
       SortedMap<ByteString, ByteString> rows = getTable(tableId);
+      System.out.println("num of rows = " + rows.size());

Review Comment:
   oops :P 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to