scwhittle commented on code in PR #29879:
URL: https://github.com/apache/beam/pull/29879#discussion_r1440596051
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1747,6 +1748,14 @@ private void updateThreadMetrics() {
maxOutstandingBundles.addValue(workUnitExecutor.maximumElementsOutstanding());
}
+ private void sendWorkerMessage() throws IOException {
+ LOG.info("[chengedward] creating StreamingScalingReport");
Review Comment:
make sure to remove debug lgos
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1758,6 +1767,13 @@ public void reportPeriodicWorkerUpdates() {
} catch (Exception e) {
LOG.error("Unexpected exception while trying to send counter updates",
e);
}
+ try {
+ sendWorkerMessage();
Review Comment:
maybe do this from a separate periodic method than the existing method
sending the counters?
So that the latency of reporting this message doesn't hold up counters which
can affect watermark latency in appliance.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState
reportWorkItemStatus(WorkItemStatus workItemStatus)
logger.debug("ReportWorkItemStatus result: {}", state);
return state;
}
+
+ /** Reports the autoscaling signals to dataflow */
+ @Override
+ public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport
report)
+ throws IOException {
+ DateTime endTime = DateTime.now();
+ logger.debug("Reporting WorkMessageResponse");
+ Map<String, String> labels =
+ new HashMap<String, String>(
+ ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID",
options.getWorkerId()));
+ WorkerMessage msg =
+ new WorkerMessage()
+ .setTime(toCloudTime(endTime))
+ .setStreamingScalingReport(report)
+ .setLabels(labels);
+ SendWorkerMessagesRequest request =
+ new SendWorkerMessagesRequest()
+ .setLocation(options.getRegion())
+ .setWorkerMessages(Collections.singletonList(msg));
+ SendWorkerMessagesResponse result =
+ dataflow
+ .projects()
+ .locations()
+ .workerMessages(options.getProject(), options.getRegion(), request)
+ .execute();
+ if (result == null) {
+ logger.warn("Worker Message response is null");
+ throw new IOException("Got null Worker Message response");
+ }
+
+ // Currently no response is expected
+ if (result.getWorkerMessageResponses() == null) {
+ return new WorkerMessageResponse();
+ }
+ WorkerMessageResponse response = result.getWorkerMessageResponses().get(0);
Review Comment:
verify there is either 1 or 0 responses? this will just ignore more than the
first which could be confusing.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState
reportWorkItemStatus(WorkItemStatus workItemStatus)
logger.debug("ReportWorkItemStatus result: {}", state);
return state;
}
+
+ /** Reports the autoscaling signals to dataflow */
+ @Override
+ public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport
report)
+ throws IOException {
+ DateTime endTime = DateTime.now();
+ logger.debug("Reporting WorkMessageResponse");
+ Map<String, String> labels =
+ new HashMap<String, String>(
Review Comment:
rm new HashMap , that copies entries from the immutable map but then will be
mutable. you can just use the immutable map
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1758,6 +1767,13 @@ public void reportPeriodicWorkerUpdates() {
} catch (Exception e) {
LOG.error("Unexpected exception while trying to send counter updates",
e);
}
+ try {
+ sendWorkerMessage();
Review Comment:
do we want to do this for appliance or just streaming engine?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -269,4 +277,42 @@ public WorkItemServiceState
reportWorkItemStatus(WorkItemStatus workItemStatus)
logger.debug("ReportWorkItemStatus result: {}", state);
return state;
}
+
+ /** Reports the autoscaling signals to dataflow */
+ @Override
+ public WorkerMessageResponse reportWorkerMessage(StreamingScalingReport
report)
Review Comment:
if this stays as is, the method should be renamed. But I think it would be
better if we can combine with other data sent in the same rpc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]