scwhittle commented on code in PR #28016:
URL: https://github.com/apache/beam/pull/28016#discussion_r1295610601


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -360,69 +343,34 @@ private <ResponseT> ResponseT 
callWithBackoff(Supplier<ResponseT> function) {
 
   @Override
   public GetWorkResponse getWork(GetWorkRequest request) {
-    if (syncApplianceStub == null) {
-      return callWithBackoff(
-          () ->
-              syncStub()
-                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
-                  .getWork(
-                      request
-                          .toBuilder()
-                          .setJobId(options.getJobId())
-                          .setProjectId(options.getProject())
-                          .setWorkerId(options.getWorkerId())
-                          .build()));
-    } else {
-      return callWithBackoff(
-          () ->
-              syncApplianceStub
-                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
-                  .getWork(request));
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.getWork(request));
     }
+
+    throw new RpcException(
+        new UnsupportedOperationException(
+            "Unary GetWork calls are not supported with Windmill service."));

Review Comment:
   Streaming Engine is what dataflow docs refer to it as, can you update here 
and below? I see some usage of this text predates your change but let's make it 
clearer



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -242,17 +239,17 @@ private synchronized void initializeLocalHost(int port) {
     Channel channel = localhostChannel(port);
     if (options.isEnableStreamingEngine()) {
       this.stubList.add(CloudWindmillServiceV1Alpha1Grpc.newStub(channel));
-      
this.syncStubList.add(CloudWindmillServiceV1Alpha1Grpc.newBlockingStub(channel));
     } else {
-      this.syncApplianceStub = WindmillApplianceGrpc.newBlockingStub(channel);
+      this.syncApplianceStub =
+          WindmillApplianceGrpc.newBlockingStub(channel)
+              .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS);

Review Comment:
   can unaryDeadlineSeconds be removed if the stub remembers it now?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/grpcclient/GrpcWindmillServer.java:
##########
@@ -480,30 +428,26 @@ public CommitWorkStream commitWorkStream() {
 
   @Override
   public GetConfigResponse getConfig(GetConfigRequest request) {
-    if (syncApplianceStub == null) {
-      throw new RpcException(
-          new UnsupportedOperationException("GetConfig not supported with 
windmill service."));
-    } else {
-      return callWithBackoff(
-          () ->
-              syncApplianceStub
-                  .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)
-                  .getConfig(request));
+    if (syncApplianceStub != null) {
+      return callWithBackoff(() -> syncApplianceStub.getConfig(request));
     }
+
+    throw new RpcException(
+        new UnsupportedOperationException("GetConfig not supported with 
windmill service."));
   }
 
   @Override
   public ReportStatsResponse reportStats(ReportStatsRequest request) {
-    if (syncApplianceStub == null) {
-      throw new RpcException(
-          new UnsupportedOperationException("ReportStats not supported with 
windmill service."));
-    } else {
+    if (syncApplianceStub != null) {
       return callWithBackoff(
           () ->
               syncApplianceStub
                   .withDeadlineAfter(unaryDeadlineSeconds, TimeUnit.SECONDS)

Review Comment:
   remove deadline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to