acrites commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1458032802
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -192,11 +205,35 @@ public void refreshActiveWork(Map<String,
List<KeyedGetDataRequest>> active) {
.addRequests(request));
}
}
+ for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
+ for (HeartbeatRequest request : entry.getValue()) {
+ // Calculate the bytes with some overhead for proto encoding.
+ long bytes = (long) entry.getKey().length() +
request.getSerializedSize() + 10;
+ if (builderBytes > 0
+ && (builderBytes + bytes >
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+ || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
+ send(builder.build());
+ builderBytes = 0;
+ builder.clear();
+ }
+ builderBytes += bytes;
+ builder.addComputationHeartbeatRequest(
+ ComputationHeartbeatRequest.newBuilder()
+ .setComputationId(entry.getKey())
Review Comment:
GrpcWindmillServerTest.testStreamingGetDataHeartbeatsAsHeartbeatRequests
should now do the right thing. Before, the test was exploding any requests sent
before verifying, so we always got one key per ComputationHeartbeatRequest.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]