igorbernstein2 commented on code in PR #34245:
URL: https://github.com/apache/beam/pull/34245#discussion_r2060301903
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java:
##########
@@ -119,6 +119,27 @@ public void testE2EBigtableRead() {
checkLineageSourceMetric(r, tableId);
}
+ @Test
+ public void testE2EBigtableReadWithSkippingLargeRows() {
+ BigtableOptions.Builder bigtableOptionsBuilder =
+ new
BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());
+
+ final String tableId = "BigtableReadTest";
+ final long numRows = 1000L;
Review Comment:
I think you can do a bit better here
```java
testE2EBigtableReadWithSkippingLargeRows() {
//...
// add an error injector to trigger large row logic
ExperimentalOptions.addExperiment("bigtable_settings_override",
InterceptorInjector.class.getName());
//...
}
static class LargeRowErrorInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT>
interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT,
RespT>(next.newCall(method, callOptions)) {
private boolean artificiallyClosed = false;
private int numMsgs = 0;
@Override
public void start(Listener<RespT> responseListener, Metadata
headers) {
super.start(new
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
@Override
public void onMessage(RespT message) {
if (++numMsgs > 10) {
artificiallyClosed = true;
delegate().onClose(
Status.WHATEVER_ERROR_TRIGGERS_PAGING,
new Metadata()
);
return;
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata
trailers) {
if (!artificiallyClosed) {
super.onClose(status, trailers);
}
}
}, headers);
}
};
}
}
public static class InterceptorInjector implements
BiFunction<BigtableDataSettings.Builder, PipelineOptions,
BigtableDataSettings.Builder> {
@Override
public BigtableDataSettings.Builder
apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
InstantiatingGrpcChannelProvider.Builder
transportChannelProvider = ((InstantiatingGrpcChannelProvider)
builder.stubSettings()
.getTransportChannelProvider())
.toBuilder();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>
oldConf = transportChannelProvider.getChannelConfigurator();
transportChannelProvider.setChannelConfigurator(b -> {
if (oldConf!=null) {
b = oldConf.apply(b);
}
return b.intercept(new LargeRowErrorInterceptor());
});
return null;
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]