scwhittle commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1440596051


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1747,6 +1748,14 @@ private void updateThreadMetrics() {
     
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
   }
 
+  private void sendWorkerMessage() throws IOException {
+    LOG.info("[chengedward] creating StreamingScalingReport");

Review Comment:
   make sure to remove debug lgos



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1758,6 +1767,13 @@ public void reportPeriodicWorkerUpdates() {
     } catch (Exception e) {
       LOG.error("Unexpected exception while trying to send counter updates", 
e);
     }
+    try {
+      sendWorkerMessage();

Review Comment:
   maybe do this from a separate periodic method than the existing method 
sending the counters?
   So that the latency of reporting this message doesn't hold up counters which 
can affect watermark latency in appliance.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState 
reportWorkItemStatus(WorkItemStatus workItemStatus)
     logger.debug("ReportWorkItemStatus result: {}", state);
     return state;
   }
+
+  /** Reports the autoscaling signals to dataflow */
+  @Override
+  public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport 
report)
+      throws IOException {
+    DateTime endTime = DateTime.now();
+    logger.debug("Reporting WorkMessageResponse");
+    Map<String, String> labels =
+        new HashMap<String, String>(
+            ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID", 
options.getWorkerId()));
+    WorkerMessage msg =
+        new WorkerMessage()
+            .setTime(toCloudTime(endTime))
+            .setStreamingScalingReport(report)
+            .setLabels(labels);
+    SendWorkerMessagesRequest request =
+        new SendWorkerMessagesRequest()
+            .setLocation(options.getRegion())
+            .setWorkerMessages(Collections.singletonList(msg));
+    SendWorkerMessagesResponse result =
+        dataflow
+            .projects()
+            .locations()
+            .workerMessages(options.getProject(), options.getRegion(), request)
+            .execute();
+    if (result == null) {
+      logger.warn("Worker Message response is null");
+      throw new IOException("Got null Worker Message response");
+    }
+
+    // Currently no response is expected
+    if (result.getWorkerMessageResponses() == null) {
+      return new WorkerMessageResponse();
+    }
+    WorkerMessageResponse response = result.getWorkerMessageResponses().get(0);

Review Comment:
   verify there is either 1 or 0 responses? this will just ignore more than the 
first which could be confusing.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState 
reportWorkItemStatus(WorkItemStatus workItemStatus)
     logger.debug("ReportWorkItemStatus result: {}", state);
     return state;
   }
+
+  /** Reports the autoscaling signals to dataflow */
+  @Override
+  public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport 
report)
+      throws IOException {
+    DateTime endTime = DateTime.now();
+    logger.debug("Reporting WorkMessageResponse");
+    Map<String, String> labels =
+        new HashMap<String, String>(

Review Comment:
   rm new HashMap , that copies entries from the immutable map but then will be 
mutable.  you can just use the immutable map



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1758,6 +1767,13 @@ public void reportPeriodicWorkerUpdates() {
     } catch (Exception e) {
       LOG.error("Unexpected exception while trying to send counter updates", 
e);
     }
+    try {
+      sendWorkerMessage();

Review Comment:
   do we want to do this for appliance or just streaming engine?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState 
reportWorkItemStatus(WorkItemStatus workItemStatus)
     logger.debug("ReportWorkItemStatus result: {}", state);
     return state;
   }
+
+  /** Reports the autoscaling signals to dataflow */
+  @Override
+  public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport 
report)

Review Comment:
   if this stays as is, the method should be renamed.  But I think it would be 
better if we can combine with other data sent in the same rpc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to