This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/hot_key_cp
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b106c72cb683e1689eec70a5056d1402a868c33d
Author: Danny McCormick <[email protected]>
AuthorDate: Thu Aug 22 20:04:18 2024 +0100

    Check experiment for hotkey logging (#32285)
    
    * Check experiment for hotkey logging
    
    * Spotless
---
 .../org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java | 2 +-
 .../beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java    | 3 ++-
 .../worker/windmill/work/processing/StreamingWorkScheduler.java      | 5 ++++-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 1fedcd8f3a2..c01096716c9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -423,7 +423,7 @@ public class DataflowPipelineTranslator {
       if (options.getDataflowKmsKey() != null) {
         environment.setServiceKmsKeyName(options.getDataflowKmsKey());
       }
-      if (options.isHotKeyLoggingEnabled()) {
+      if (options.isHotKeyLoggingEnabled() || hasExperiment(options, 
"enable_hot_key_logging")) {
         DebugOptions debugOptions = new DebugOptions();
         debugOptions.setEnableHotKeyLogging(true);
         environment.setDebugOptions(debugOptions);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
index f901b01ed56..b94759c239a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudDuration;
 import static org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 
 import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.ApproximateSplitRequest;
@@ -132,7 +133,7 @@ public class DataflowWorkProgressUpdater extends 
WorkProgressUpdater {
 
         // The key set the in BatchModeExecutionContext is only set in the 
GroupingShuffleReader
         // which is the correct key. The key is also translated into a Java 
object in the reader.
-        if (options.isHotKeyLoggingEnabled()) {
+        if (options.isHotKeyLoggingEnabled() || hasExperiment(options, 
"enable_hot_key_logging")) {
           hotKeyLogger.logHotKeyDetection(
               hotKeyDetection.getUserStepName(),
               TimeUtil.fromCloudDuration(hotKeyDetection.getHotKeyAge()),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index b0b6377dd8b..86f2cffe604 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
 
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
 import com.google.api.services.dataflow.model.MapTask;
 import com.google.auto.value.AutoValue;
 import java.util.Collection;
@@ -368,7 +370,8 @@ public final class StreamingWorkScheduler {
         Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() / 
1000);
 
         String stepName = 
getShuffleTaskStepName(computationState.getMapTask());
-        if (options.isHotKeyLoggingEnabled() && keyCoder.isPresent()) {
+        if ((options.isHotKeyLoggingEnabled() || hasExperiment(options, 
"enable_hot_key_logging"))
+            && keyCoder.isPresent()) {
           hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
         } else {
           hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);

Reply via email to