igorbernstein2 commented on code in PR #34245: URL: https://github.com/apache/beam/pull/34245#discussion_r2060301903
########## sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java: ########## @@ -119,6 +119,27 @@ public void testE2EBigtableRead() { checkLineageSourceMetric(r, tableId); } + @Test + public void testE2EBigtableReadWithSkippingLargeRows() { + BigtableOptions.Builder bigtableOptionsBuilder = + new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId()); + + final String tableId = "BigtableReadTest"; + final long numRows = 1000L; Review Comment: I think you can do a bit better here ```java testE2EBigtableReadWithSkippingLargeRows() { //... // add an error injector to trigger large row logic ExperimentalOptions.addExperiment("bigtable_settings_override", InterceptorInjector.class.getName()); //... } static class LargeRowErrorInterceptor implements ClientInterceptor { @Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { private boolean artificiallyClosed = false; private int numMsgs = 0; @Override public void start(Listener<RespT> responseListener, Metadata headers) { super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() { @Override public void onMessage(RespT message) { if (++numMsgs > 10) { artificiallyClosed = true; delegate().onClose( Status.WHATEVER_ERROR_TRIGGERS_PAGING, new Metadata() ); return; } super.onMessage(message); } @Override public void onClose(Status status, Metadata trailers) { if (!artificiallyClosed) { super.onClose(status, trailers); } } }, headers); } }; } } public static class InterceptorInjector implements BiFunction<BigtableDataSettings.Builder, PipelineOptions, BigtableDataSettings.Builder> { @Override public BigtableDataSettings.Builder apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) { InstantiatingGrpcChannelProvider.Builder transportChannelProvider = ((InstantiatingGrpcChannelProvider) builder.stubSettings() .getTransportChannelProvider()) .toBuilder(); ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConf = transportChannelProvider.getChannelConfigurator(); transportChannelProvider.setChannelConfigurator(b -> { if (oldConf!=null) { b = oldConf.apply(b); } return b.intercept(new LargeRowErrorInterceptor()); }); return null; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org