scwhittle commented on code in PR #33512:
URL: https://github.com/apache/beam/pull/33512#discussion_r1908660992
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java:
##########
@@ -161,7 +161,7 @@ private Windmill.StreamingGetWorkResponseChunk
createResponse(Windmill.WorkItem
.setInputDataWatermark(1L)
.setDependentRealtimeInputWatermark(1L)
.build())
- .setSerializedWorkItem(workItem.toByteString())
+ .addSerializedWorkItem(workItem.toByteString())
Review Comment:
add a test with batched items?
##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -763,6 +763,8 @@ message StreamingGetWorkRequest {
GetWorkRequest request = 1;
Review Comment:
// Initial message is GetWorkRequest with subsequent messages being
extensions.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -184,6 +182,7 @@ private void maybeSendRequestExtension(GetWorkBudget
extension) {
Windmill.StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(extension.items())
.setMaxBytes(extension.bytes()))
+
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
Review Comment:
not sure we need to set it on the extesnsion
##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -763,6 +763,8 @@ message StreamingGetWorkRequest {
GetWorkRequest request = 1;
StreamingGetWorkRequestExtension request_extension = 2;
}
+
+ optional bool supports_multiple_work_items_in_chunk = 5 [default = false];
Review Comment:
// Ignored after initial request.
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -136,6 +136,11 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setUseSeparateWindmillHeartbeatStreams(Boolean value);
+ @Description("If true, GetWorkStreams will request multiple work items in a
response chunk.")
+ boolean getWindmillMultipleItemsInGetWorkResponse();
Review Comment:
nit: how about getWindmillRequestBatchedGetWorkResponse?
Batched is shorter than MultipleItemsIn, and Request indicates that client
is asking for it.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -88,6 +80,8 @@ final class GrpcDirectGetWorkStream
*/
private final ConcurrentMap<Long, GetWorkResponseChunkAssembler>
workItemAssemblers;
+ private final boolean multipleItemsInGetWorkResponse;
Review Comment:
ditto on naming throughout
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java:
##########
@@ -115,6 +120,7 @@ private void sendRequestExtension(long moreItems, long
moreBytes) {
StreamingGetWorkRequestExtension.newBuilder()
.setMaxItems(moreItems)
.setMaxBytes(moreBytes))
+
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
Review Comment:
ditto just on initial request?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -231,7 +231,15 @@ public void appendSpecificHtml(PrintWriter writer) {
@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
- trySend(HEALTH_CHECK_REQUEST);
+ trySend(
+ StreamingGetWorkRequest.newBuilder()
+ .setRequestExtension(
+ Windmill.StreamingGetWorkRequestExtension.newBuilder()
+ .setMaxItems(0)
+ .setMaxBytes(0)
+ .build())
+
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)
Review Comment:
ditto (and could revert to constant)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]