scwhittle commented on code in PR #33512:
URL: https://github.com/apache/beam/pull/33512#discussion_r1908660992


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java:
##########
@@ -161,7 +161,7 @@ private Windmill.StreamingGetWorkResponseChunk 
createResponse(Windmill.WorkItem
                 .setInputDataWatermark(1L)
                 .setDependentRealtimeInputWatermark(1L)
                 .build())
-        .setSerializedWorkItem(workItem.toByteString())
+        .addSerializedWorkItem(workItem.toByteString())

Review Comment:
   add a test with batched items?



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -763,6 +763,8 @@ message StreamingGetWorkRequest {
     GetWorkRequest request = 1;

Review Comment:
   // Initial message is GetWorkRequest with subsequent messages being 
extensions.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -184,6 +182,7 @@ private void maybeSendRequestExtension(GetWorkBudget 
extension) {
                         Windmill.StreamingGetWorkRequestExtension.newBuilder()
                             .setMaxItems(extension.items())
                             .setMaxBytes(extension.bytes()))
+                    
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)

Review Comment:
   not sure we need to set it on the extesnsion



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -763,6 +763,8 @@ message StreamingGetWorkRequest {
     GetWorkRequest request = 1;
     StreamingGetWorkRequestExtension request_extension = 2;
   }
+
+  optional bool supports_multiple_work_items_in_chunk = 5 [default = false];

Review Comment:
   // Ignored after initial request.



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -136,6 +136,11 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setUseSeparateWindmillHeartbeatStreams(Boolean value);
 
+  @Description("If true, GetWorkStreams will request multiple work items in a 
response chunk.")
+  boolean getWindmillMultipleItemsInGetWorkResponse();

Review Comment:
   nit: how about getWindmillRequestBatchedGetWorkResponse?
   
   Batched is shorter than MultipleItemsIn, and Request indicates that client 
is asking for it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -88,6 +80,8 @@ final class GrpcDirectGetWorkStream
    */
   private final ConcurrentMap<Long, GetWorkResponseChunkAssembler> 
workItemAssemblers;
 
+  private final boolean multipleItemsInGetWorkResponse;

Review Comment:
   ditto on naming throughout



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java:
##########
@@ -115,6 +120,7 @@ private void sendRequestExtension(long moreItems, long 
moreBytes) {
                 StreamingGetWorkRequestExtension.newBuilder()
                     .setMaxItems(moreItems)
                     .setMaxBytes(moreBytes))
+            
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)

Review Comment:
   ditto just on initial request?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -231,7 +231,15 @@ public void appendSpecificHtml(PrintWriter writer) {
 
   @Override
   public void sendHealthCheck() throws WindmillStreamShutdownException {
-    trySend(HEALTH_CHECK_REQUEST);
+    trySend(
+        StreamingGetWorkRequest.newBuilder()
+            .setRequestExtension(
+                Windmill.StreamingGetWorkRequestExtension.newBuilder()
+                    .setMaxItems(0)
+                    .setMaxBytes(0)
+                    .build())
+            
.setSupportsMultipleWorkItemsInChunk(multipleItemsInGetWorkResponse)

Review Comment:
   ditto (and could revert to constant)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to