This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fb0762352c2 Periodically send perWorkerMetrics on the WorkerMessages 
API (#30135)
fb0762352c2 is described below

commit fb0762352c20ac50f182facdd82d80f4ecfaa052
Author: JayajP <[email protected]>
AuthorDate: Wed Jan 31 14:21:12 2024 -0800

    Periodically send perWorkerMetrics on the WorkerMessages API (#30135)
---
 .../dataflow/worker/DataflowWorkUnitClient.java    | 24 +++++++++++--
 .../dataflow/worker/StreamingDataflowWorker.java   | 40 +++++++++++++++++++---
 .../worker/StreamingStepMetricsContainer.java      |  8 ++++-
 .../runners/dataflow/worker/WorkUnitClient.java    | 17 +++++++--
 .../dataflow/worker/streaming/StageInfo.java       |  9 +++++
 .../worker/DataflowWorkUnitClientTest.java         | 37 ++++++++++++++++++--
 6 files changed, 122 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index bb39e3bd9af..f3caa8d0f3a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -29,6 +29,7 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Mo
 import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
 import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
 import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
 import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
@@ -292,13 +293,30 @@ class DataflowWorkUnitClient implements WorkUnitClient {
     return msg;
   }
 
-  /** Reports the autoscaling signals to dataflow */
   @Override
-  public void reportWorkerMessage(WorkerMessage msg) throws IOException {
+  public WorkerMessage 
createWorkerMessageFromPerWorkerMetrics(PerWorkerMetrics report) {
+    DateTime endTime = DateTime.now();
+    logger.debug("Reporting WorkMessageResponse");
+    Map<String, String> labels =
+        ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID", 
options.getWorkerId());
+    WorkerMessage msg =
+        new WorkerMessage()
+            .setTime(toCloudTime(endTime))
+            .setPerWorkerMetrics(report)
+            .setLabels(labels);
+    return msg;
+  }
+
+  /**
+   * Reports the worker messages to dataflow. We currently report autoscaling 
signals and
+   * perworkermetrics with this path.
+   */
+  @Override
+  public void reportWorkerMessage(List<WorkerMessage> messages) throws 
IOException {
     SendWorkerMessagesRequest request =
         new SendWorkerMessagesRequest()
             .setLocation(options.getRegion())
-            .setWorkerMessages(Collections.singletonList(msg));
+            .setWorkerMessages(messages);
     SendWorkerMessagesResponse result =
         dataflow
             .projects()
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d915b77995d..f37504692d3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -23,12 +23,15 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
 import com.google.api.services.dataflow.model.Status;
 import com.google.api.services.dataflow.model.StreamingComputationConfig;
 import com.google.api.services.dataflow.model.StreamingConfigTask;
 import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.IOException;
@@ -117,6 +120,7 @@ import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.JvmInitializers;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.BackOff;
@@ -486,10 +490,13 @@ public class StreamingDataflowWorker {
     // metrics.
     MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
 
-    // When enabled, the Pipeline will record Per-Worker metrics that will be 
piped to WMW.
+    // When enabled, the Pipeline will record Per-Worker metrics that will be 
piped to DFE.
     StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
         options.isEnableStreamingEngine()
             && DataflowRunner.hasExperiment(options, 
"enable_per_worker_metrics"));
+    // StreamingStepMetricsContainer automatically deletes perWorkerCounters 
if they are zero-valued
+    // for longer than 5 minutes.
+    BigQuerySinkMetrics.setSupportMetricsDeletion(true);
 
     JvmInitializers.runBeforeProcessing(options);
     worker.startStatusPages();
@@ -1777,7 +1784,7 @@ public class StreamingDataflowWorker {
     maxOutstandingBundles.addValue((long) 
workUnitExecutor.maximumElementsOutstanding());
   }
 
-  private void sendWorkerMessage() throws IOException {
+  private WorkerMessage createWorkerMessageForStreamingScalingReport() {
     StreamingScalingReport activeThreadsReport =
         new StreamingScalingReport()
             .setActiveThreadCount(workUnitExecutor.activeCount())
@@ -1786,8 +1793,33 @@ public class StreamingDataflowWorker {
             .setMaximumThreadCount(chooseMaximumNumberOfThreads())
             
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding())
             .setMaximumBytes(workUnitExecutor.maximumBytesOutstanding());
-    workUnitClient.reportWorkerMessage(
-        
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
+    return 
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
+  }
+
+  private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
+    List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
+    stageInfoMap.values().forEach(s -> 
metrics.addAll(s.extractPerWorkerMetricValues()));
+
+    if (metrics.isEmpty()) {
+      return Optional.empty();
+    }
+
+    PerWorkerMetrics perWorkerMetrics = new 
PerWorkerMetrics().setPerStepNamespaceMetrics(metrics);
+    return 
Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
+  }
+
+  private void sendWorkerMessage() throws IOException {
+    List<WorkerMessage> workerMessages = new ArrayList<WorkerMessage>(2);
+    workerMessages.add(createWorkerMessageForStreamingScalingReport());
+
+    if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
+      Optional<WorkerMessage> metricsMsg = 
createWorkerMessageForPerWorkerMetrics();
+      if (metricsMsg.isPresent()) {
+        workerMessages.add(metricsMsg.get());
+      }
+    }
+
+    workUnitClient.reportWorkerMessage(workerMessages);
   }
 
   @VisibleForTesting
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index af3b3e51071..54a3ef49776 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -98,7 +98,10 @@ public class StreamingStepMetricsContainer implements 
MetricsContainer {
     };
   }
 
-  // Testing constructor.
+  /**
+   * Construct a {@code StreamingStepMetricsContainer} that supports mock 
clock, perWorkerCounters,
+   * and perWorkerCountersByFirstStaleTime. For testing purposes only.
+   */
   private StreamingStepMetricsContainer(
       String stepName,
       Map<MetricName, Instant> perWorkerCountersByFirstStaleTime,
@@ -218,6 +221,9 @@ public class StreamingStepMetricsContainer implements 
MetricsContainer {
     StreamingStepMetricsContainer.enablePerWorkerMetrics = 
enablePerWorkerMetrics;
   }
 
+  public static boolean getEnablePerWorkerMetrics() {
+    return StreamingStepMetricsContainer.enablePerWorkerMetrics;
+  }
   /**
    * Updates {@code perWorkerCountersByFirstStaleTime} with the current 
zero-valued metrics and
    * removes metrics that have been stale for longer than {@code 
maximumPerWorkerCounterStaleness}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
index 6f26f404807..f9637a375ed 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
@@ -17,12 +17,14 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
 import com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
 import com.google.api.services.dataflow.model.WorkItemStatus;
 import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
+import java.util.List;
 import java.util.Optional;
 
 /** Abstract base class describing a client for WorkItem work units. */
@@ -61,9 +63,18 @@ interface WorkUnitClient {
   WorkerMessage 
createWorkerMessageFromStreamingScalingReport(StreamingScalingReport report);
 
   /**
-   * Reports the autoscaling signals with a {@link StreamingScalingReport}.
+   * Creates a {@link WorkerMessage} containing the given PerWorkerMetrics
    *
-   * @param msg the WorkerMessage to report
+   * @param metrics Metric updates to send to the backend.
+   * @return a {@link WorkerMessage}
+   */
+  WorkerMessage createWorkerMessageFromPerWorkerMetrics(PerWorkerMetrics 
metrics);
+
+  /**
+   * Reports the worker messages to dataflow. We currently report autoscaling 
signals and
+   * perworkermetrics with this path.
+   *
+   * @param msg the WorkerMessages to report
    */
-  void reportWorkerMessage(WorkerMessage msg) throws IOException;
+  void reportWorkerMessage(List<WorkerMessage> messages) throws IOException;
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index b514dfc84bb..64c97dcac51 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THRO
 
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterUpdate;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
 import com.google.auto.value.AutoValue;
 import java.util.ArrayList;
 import java.util.List;
@@ -111,4 +112,12 @@ public abstract class StageInfo {
       }
     }
   }
+
+  public List<PerStepNamespaceMetrics> extractPerWorkerMetricValues() {
+    List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
+    Iterables.addAll(
+        metrics,
+        
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(metricsContainerRegistry()));
+    return metrics;
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 7019217b9fb..59d8c55c52d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -31,6 +31,9 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
 import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
 import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.MetricValue;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
 import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
 import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
 import com.google.api.services.dataflow.model.SeqMapTask;
@@ -38,6 +41,7 @@ import 
com.google.api.services.dataflow.model.StreamingScalingReport;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkerMessage;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Optional;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -232,7 +236,7 @@ public class DataflowWorkUnitClientTest {
   }
 
   @Test
-  public void testReportWorkerMessage() throws Exception {
+  public void testReportWorkerMessage_streamingScalingReport() throws 
Exception {
     MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
     response.setContentType(Json.MEDIA_TYPE);
     SendWorkerMessagesResponse workerMessage = new 
SendWorkerMessagesResponse();
@@ -249,7 +253,7 @@ public class DataflowWorkUnitClientTest {
             .setMaximumBytes(6L);
     WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
     WorkerMessage msg = 
client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
-    client.reportWorkerMessage(msg);
+    client.reportWorkerMessage(Collections.singletonList(msg));
 
     SendWorkerMessagesRequest actualRequest =
         Transport.getJsonFactory()
@@ -257,6 +261,35 @@ public class DataflowWorkUnitClientTest {
     assertEquals(ImmutableList.of(msg), actualRequest.getWorkerMessages());
   }
 
+  @Test
+  public void testReportWorkerMessage_perWorkerMetrics() throws Exception {
+    MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+    response.setContentType(Json.MEDIA_TYPE);
+    SendWorkerMessagesResponse workerMessage = new 
SendWorkerMessagesResponse();
+    workerMessage.setFactory(Transport.getJsonFactory());
+    response.setContent(workerMessage.toPrettyString());
+    when(request.execute()).thenReturn(response);
+    PerStepNamespaceMetrics stepNamespaceMetrics =
+        new PerStepNamespaceMetrics()
+            .setOriginalStep("s1")
+            .setMetricsNamespace("ns")
+            .setMetricValues(
+                Collections.singletonList(new 
MetricValue().setMetric("metric").setValueInt64(3L)));
+    PerWorkerMetrics perWorkerMetrics =
+        new PerWorkerMetrics()
+            
.setPerStepNamespaceMetrics(Collections.singletonList(stepNamespaceMetrics));
+
+    WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+    WorkerMessage perWorkerMetricsMsg =
+        client.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics);
+    client.reportWorkerMessage(Collections.singletonList(perWorkerMetricsMsg));
+
+    SendWorkerMessagesRequest actualRequest =
+        Transport.getJsonFactory()
+            .fromString(request.getContentAsString(), 
SendWorkerMessagesRequest.class);
+    assertEquals(ImmutableList.of(perWorkerMetricsMsg), 
actualRequest.getWorkerMessages());
+  }
+
   private LowLevelHttpResponse generateMockResponse(WorkItem... workItems) 
throws Exception {
     MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
     response.setContentType(Json.MEDIA_TYPE);

Reply via email to