igorbernstein2 commented on code in PR #34245:
URL: https://github.com/apache/beam/pull/34245#discussion_r2060301903


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java:
##########
@@ -119,6 +119,27 @@ public void testE2EBigtableRead() {
     checkLineageSourceMetric(r, tableId);
   }
 
+  @Test
+  public void testE2EBigtableReadWithSkippingLargeRows() {
+    BigtableOptions.Builder bigtableOptionsBuilder =
+        new 
BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());
+
+    final String tableId = "BigtableReadTest";
+    final long numRows = 1000L;

Review Comment:
   I think you can do a bit better here
   
   
   ```java
   testE2EBigtableReadWithSkippingLargeRows() {
      //...
      // add an error injector to trigger large row logic
      ExperimentalOptions.addExperiment("bigtable_settings_override", 
InterceptorInjector.class.getName());
     //...
   }
   static class LargeRowErrorInterceptor implements ClientInterceptor {
           @Override
           public <ReqT, RespT> ClientCall<ReqT, RespT> 
interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, 
Channel next) {
               return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, 
RespT>(next.newCall(method, callOptions)) {
                   private boolean artificiallyClosed = false;
                   private int numMsgs = 0;
   
                   @Override
                   public void start(Listener<RespT> responseListener, Metadata 
headers) {
                       super.start(new 
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
                           @Override
                           public void onMessage(RespT message) {
                               if (++numMsgs > 10) {
                                   artificiallyClosed = true;
                                   delegate().onClose(
                                           
Status.WHATEVER_ERROR_TRIGGERS_PAGING,
                                           new Metadata()
                                   );
                                   return;
                               }
                               super.onMessage(message);
                           }
   
                           @Override
                           public void onClose(Status status, Metadata 
trailers) {
                               if (!artificiallyClosed) {
                                   super.onClose(status, trailers);
                               }
                           }
                       }, headers);
                   }
               };
           }
       }
       public static class InterceptorInjector implements 
BiFunction<BigtableDataSettings.Builder, PipelineOptions,
               BigtableDataSettings.Builder> {
           @Override
           public BigtableDataSettings.Builder 
apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
               InstantiatingGrpcChannelProvider.Builder 
transportChannelProvider = ((InstantiatingGrpcChannelProvider) 
builder.stubSettings()
                       .getTransportChannelProvider())
                               .toBuilder();
               ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> 
oldConf = transportChannelProvider.getChannelConfigurator();
               transportChannelProvider.setChannelConfigurator(b -> {
                   if (oldConf!=null) {
                       b = oldConf.apply(b);
                   }
                   return b.intercept(new LargeRowErrorInterceptor());
               });
               return null;
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to