edman124 commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1441944806


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState 
reportWorkItemStatus(WorkItemStatus workItemStatus)
     logger.debug("ReportWorkItemStatus result: {}", state);
     return state;
   }
+
+  /** Reports the autoscaling signals to dataflow */
+  @Override
+  public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport 
report)
+      throws IOException {
+    DateTime endTime = DateTime.now();
+    logger.debug("Reporting WorkMessageResponse");
+    Map<String, String> labels =
+        new HashMap<String, String>(
+            ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID", 
options.getWorkerId()));
+    WorkerMessage msg =
+        new WorkerMessage()
+            .setTime(toCloudTime(endTime))
+            .setStreamingScalingReport(report)
+            .setLabels(labels);
+    SendWorkerMessagesRequest request =
+        new SendWorkerMessagesRequest()
+            .setLocation(options.getRegion())
+            .setWorkerMessages(Collections.singletonList(msg));
+    SendWorkerMessagesResponse result =
+        dataflow
+            .projects()
+            .locations()
+            .workerMessages(options.getProject(), options.getRegion(), request)
+            .execute();
+    if (result == null) {
+      logger.warn("Worker Message response is null");
+      throw new IOException("Got null Worker Message response");
+    }
+
+    // Currently no response is expected
+    if (result.getWorkerMessageResponses() == null) {
+      return new WorkerMessageResponse();
+    }
+    WorkerMessageResponse response = result.getWorkerMessageResponses().get(0);

Review Comment:
   Leaving it like this temporarily since this quarter there is work to 
populate a response and process it on the legacy worker side



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to