This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fb0762352c2 Periodically send perWorkerMetrics on the WorkerMessages
API (#30135)
fb0762352c2 is described below
commit fb0762352c20ac50f182facdd82d80f4ecfaa052
Author: JayajP <[email protected]>
AuthorDate: Wed Jan 31 14:21:12 2024 -0800
Periodically send perWorkerMetrics on the WorkerMessages API (#30135)
---
.../dataflow/worker/DataflowWorkUnitClient.java | 24 +++++++++++--
.../dataflow/worker/StreamingDataflowWorker.java | 40 +++++++++++++++++++---
.../worker/StreamingStepMetricsContainer.java | 8 ++++-
.../runners/dataflow/worker/WorkUnitClient.java | 17 +++++++--
.../dataflow/worker/streaming/StageInfo.java | 9 +++++
.../worker/DataflowWorkUnitClientTest.java | 37 ++++++++++++++++++--
6 files changed, 122 insertions(+), 13 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
index bb39e3bd9af..f3caa8d0f3a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java
@@ -29,6 +29,7 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Mo
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
@@ -292,13 +293,30 @@ class DataflowWorkUnitClient implements WorkUnitClient {
return msg;
}
- /** Reports the autoscaling signals to dataflow */
@Override
- public void reportWorkerMessage(WorkerMessage msg) throws IOException {
+ public WorkerMessage
createWorkerMessageFromPerWorkerMetrics(PerWorkerMetrics report) {
+ DateTime endTime = DateTime.now();
+ logger.debug("Reporting WorkMessageResponse");
+ Map<String, String> labels =
+ ImmutableMap.of("JOB_ID", options.getJobId(), "WORKER_ID",
options.getWorkerId());
+ WorkerMessage msg =
+ new WorkerMessage()
+ .setTime(toCloudTime(endTime))
+ .setPerWorkerMetrics(report)
+ .setLabels(labels);
+ return msg;
+ }
+
+ /**
+ * Reports the worker messages to dataflow. We currently report autoscaling
signals and
+ * perworkermetrics with this path.
+ */
+ @Override
+ public void reportWorkerMessage(List<WorkerMessage> messages) throws
IOException {
SendWorkerMessagesRequest request =
new SendWorkerMessagesRequest()
.setLocation(options.getRegion())
- .setWorkerMessages(Collections.singletonList(msg));
+ .setWorkerMessages(messages);
SendWorkerMessagesResponse result =
dataflow
.projects()
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d915b77995d..f37504692d3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -23,12 +23,15 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.StreamingComputationConfig;
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemStatus;
+import com.google.api.services.dataflow.model.WorkerMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
@@ -117,6 +120,7 @@ import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.BackOff;
@@ -486,10 +490,13 @@ public class StreamingDataflowWorker {
// metrics.
MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null));
- // When enabled, the Pipeline will record Per-Worker metrics that will be
piped to WMW.
+ // When enabled, the Pipeline will record Per-Worker metrics that will be
piped to DFE.
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(
options.isEnableStreamingEngine()
&& DataflowRunner.hasExperiment(options,
"enable_per_worker_metrics"));
+ // StreamingStepMetricsContainer automatically deletes perWorkerCounters
if they are zero-valued
+ // for longer than 5 minutes.
+ BigQuerySinkMetrics.setSupportMetricsDeletion(true);
JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
@@ -1777,7 +1784,7 @@ public class StreamingDataflowWorker {
maxOutstandingBundles.addValue((long)
workUnitExecutor.maximumElementsOutstanding());
}
- private void sendWorkerMessage() throws IOException {
+ private WorkerMessage createWorkerMessageForStreamingScalingReport() {
StreamingScalingReport activeThreadsReport =
new StreamingScalingReport()
.setActiveThreadCount(workUnitExecutor.activeCount())
@@ -1786,8 +1793,33 @@ public class StreamingDataflowWorker {
.setMaximumThreadCount(chooseMaximumNumberOfThreads())
.setMaximumBundleCount(workUnitExecutor.maximumElementsOutstanding())
.setMaximumBytes(workUnitExecutor.maximumBytesOutstanding());
- workUnitClient.reportWorkerMessage(
-
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport));
+ return
workUnitClient.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
+ }
+
+ private Optional<WorkerMessage> createWorkerMessageForPerWorkerMetrics() {
+ List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
+ stageInfoMap.values().forEach(s ->
metrics.addAll(s.extractPerWorkerMetricValues()));
+
+ if (metrics.isEmpty()) {
+ return Optional.empty();
+ }
+
+ PerWorkerMetrics perWorkerMetrics = new
PerWorkerMetrics().setPerStepNamespaceMetrics(metrics);
+ return
Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
+ }
+
+ private void sendWorkerMessage() throws IOException {
+ List<WorkerMessage> workerMessages = new ArrayList<WorkerMessage>(2);
+ workerMessages.add(createWorkerMessageForStreamingScalingReport());
+
+ if (StreamingStepMetricsContainer.getEnablePerWorkerMetrics()) {
+ Optional<WorkerMessage> metricsMsg =
createWorkerMessageForPerWorkerMetrics();
+ if (metricsMsg.isPresent()) {
+ workerMessages.add(metricsMsg.get());
+ }
+ }
+
+ workUnitClient.reportWorkerMessage(workerMessages);
}
@VisibleForTesting
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index af3b3e51071..54a3ef49776 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -98,7 +98,10 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
};
}
- // Testing constructor.
+ /**
+ * Construct a {@code StreamingStepMetricsContainer} that supports mock
clock, perWorkerCounters,
+ * and perWorkerCountersByFirstStaleTime. For testing purposes only.
+ */
private StreamingStepMetricsContainer(
String stepName,
Map<MetricName, Instant> perWorkerCountersByFirstStaleTime,
@@ -218,6 +221,9 @@ public class StreamingStepMetricsContainer implements
MetricsContainer {
StreamingStepMetricsContainer.enablePerWorkerMetrics =
enablePerWorkerMetrics;
}
+ public static boolean getEnablePerWorkerMetrics() {
+ return StreamingStepMetricsContainer.enablePerWorkerMetrics;
+ }
/**
* Updates {@code perWorkerCountersByFirstStaleTime} with the current
zero-valued metrics and
* removes metrics that have been stale for longer than {@code
maximumPerWorkerCounterStaleness}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
index 6f26f404807..f9637a375ed 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkUnitClient.java
@@ -17,12 +17,14 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
import com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
+import java.util.List;
import java.util.Optional;
/** Abstract base class describing a client for WorkItem work units. */
@@ -61,9 +63,18 @@ interface WorkUnitClient {
WorkerMessage
createWorkerMessageFromStreamingScalingReport(StreamingScalingReport report);
/**
- * Reports the autoscaling signals with a {@link StreamingScalingReport}.
+ * Creates a {@link WorkerMessage} containing the given PerWorkerMetrics
*
- * @param msg the WorkerMessage to report
+ * @param metrics Metric updates to send to the backend.
+ * @return a {@link WorkerMessage}
+ */
+ WorkerMessage createWorkerMessageFromPerWorkerMetrics(PerWorkerMetrics
metrics);
+
+ /**
+ * Reports the worker messages to dataflow. We currently report autoscaling
signals and
+ * perworkermetrics with this path.
+ *
+ * @param msg the WorkerMessages to report
*/
- void reportWorkerMessage(WorkerMessage msg) throws IOException;
+ void reportWorkerMessage(List<WorkerMessage> messages) throws IOException;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
index b514dfc84bb..64c97dcac51 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java
@@ -21,6 +21,7 @@ import static
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THRO
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterUpdate;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.List;
@@ -111,4 +112,12 @@ public abstract class StageInfo {
}
}
}
+
+ public List<PerStepNamespaceMetrics> extractPerWorkerMetricValues() {
+ List<PerStepNamespaceMetrics> metrics = new ArrayList<>();
+ Iterables.addAll(
+ metrics,
+
StreamingStepMetricsContainer.extractPerWorkerMetricUpdates(metricsContainerRegistry()));
+ return metrics;
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
index 7019217b9fb..59d8c55c52d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClientTest.java
@@ -31,6 +31,9 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.MetricValue;
+import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
+import com.google.api.services.dataflow.model.PerWorkerMetrics;
import com.google.api.services.dataflow.model.SendWorkerMessagesRequest;
import com.google.api.services.dataflow.model.SendWorkerMessagesResponse;
import com.google.api.services.dataflow.model.SeqMapTask;
@@ -38,6 +41,7 @@ import
com.google.api.services.dataflow.model.StreamingScalingReport;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkerMessage;
import java.io.IOException;
+import java.util.Collections;
import java.util.Optional;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -232,7 +236,7 @@ public class DataflowWorkUnitClientTest {
}
@Test
- public void testReportWorkerMessage() throws Exception {
+ public void testReportWorkerMessage_streamingScalingReport() throws
Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);
SendWorkerMessagesResponse workerMessage = new
SendWorkerMessagesResponse();
@@ -249,7 +253,7 @@ public class DataflowWorkUnitClientTest {
.setMaximumBytes(6L);
WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
WorkerMessage msg =
client.createWorkerMessageFromStreamingScalingReport(activeThreadsReport);
- client.reportWorkerMessage(msg);
+ client.reportWorkerMessage(Collections.singletonList(msg));
SendWorkerMessagesRequest actualRequest =
Transport.getJsonFactory()
@@ -257,6 +261,35 @@ public class DataflowWorkUnitClientTest {
assertEquals(ImmutableList.of(msg), actualRequest.getWorkerMessages());
}
+ @Test
+ public void testReportWorkerMessage_perWorkerMetrics() throws Exception {
+ MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
+ response.setContentType(Json.MEDIA_TYPE);
+ SendWorkerMessagesResponse workerMessage = new
SendWorkerMessagesResponse();
+ workerMessage.setFactory(Transport.getJsonFactory());
+ response.setContent(workerMessage.toPrettyString());
+ when(request.execute()).thenReturn(response);
+ PerStepNamespaceMetrics stepNamespaceMetrics =
+ new PerStepNamespaceMetrics()
+ .setOriginalStep("s1")
+ .setMetricsNamespace("ns")
+ .setMetricValues(
+ Collections.singletonList(new
MetricValue().setMetric("metric").setValueInt64(3L)));
+ PerWorkerMetrics perWorkerMetrics =
+ new PerWorkerMetrics()
+
.setPerStepNamespaceMetrics(Collections.singletonList(stepNamespaceMetrics));
+
+ WorkUnitClient client = new DataflowWorkUnitClient(pipelineOptions, LOG);
+ WorkerMessage perWorkerMetricsMsg =
+ client.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics);
+ client.reportWorkerMessage(Collections.singletonList(perWorkerMetricsMsg));
+
+ SendWorkerMessagesRequest actualRequest =
+ Transport.getJsonFactory()
+ .fromString(request.getContentAsString(),
SendWorkerMessagesRequest.class);
+ assertEquals(ImmutableList.of(perWorkerMetricsMsg),
actualRequest.getWorkerMessages());
+ }
+
private LowLevelHttpResponse generateMockResponse(WorkItem... workItems)
throws Exception {
MockLowLevelHttpResponse response = new MockLowLevelHttpResponse();
response.setContentType(Json.MEDIA_TYPE);