This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e378f13  [BEAM-7820] Add hot key detection to Batch Dataflow Worker 
(#9186)
e378f13 is described below

commit e378f1357f3a22c521804fc42a30387eee9d15b1
Author: Sam sam <[email protected]>
AuthorDate: Tue Jul 30 14:52:31 2019 -0700

    [BEAM-7820] Add hot key detection to Batch Dataflow Worker (#9186)
    
    * Add basic hot key detection logging in Worker, will add throttling next
    
    * spotless
    
    * only log when valid
    
    * Add unit test
    
    * Add throttling mechanism
    
    * Add comment
    
    * spotless
    
    * add back visible for testing
---
 .../worker/DataflowWorkProgressUpdater.java        | 48 +++++++++++++++++
 .../util/common/worker/WorkProgressUpdater.java    |  2 +-
 .../worker/DataflowWorkProgressUpdaterTest.java    | 61 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
index becfed7..be2272a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
@@ -22,8 +22,10 @@ import static 
org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
 
 import com.google.api.client.util.Clock;
 import com.google.api.services.dataflow.model.ApproximateSplitRequest;
+import com.google.api.services.dataflow.model.HotKeyDetection;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
+import java.text.MessageFormat;
 import java.util.concurrent.ScheduledExecutorService;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
@@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory;
  */
 @NotThreadSafe
 public class DataflowWorkProgressUpdater extends WorkProgressUpdater {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(DataflowWorkProgressUpdater.class);
 
   private final WorkItemStatusClient workItemStatusClient;
@@ -46,6 +49,12 @@ public class DataflowWorkProgressUpdater extends 
WorkProgressUpdater {
   /** The WorkItem for which work progress updates are sent. */
   private final WorkItem workItem;
 
+  /**
+   * The previous time the HotKeyDetection was logged. This is used to 
throttle logging to every 5
+   * minutes.
+   */
+  private long prevHotKeyDetectionLogMs = 0;
+
   public DataflowWorkProgressUpdater(
       WorkItemStatusClient workItemStatusClient, WorkItem workItem, 
WorkExecutor worker) {
     super(worker, Integer.MAX_VALUE);
@@ -90,7 +99,12 @@ public class DataflowWorkProgressUpdater extends 
WorkProgressUpdater {
     WorkItemServiceState result =
         workItemStatusClient.reportUpdate(
             dynamicSplitResultToReport, 
Duration.millis(requestedLeaseDurationMs));
+
     if (result != null) {
+      if (shouldLogHotKeyMessage(result)) {
+        LOG.warn(getHotKeyMessage(result));
+      }
+
       // Resets state after a successful progress report.
       dynamicSplitResultToReport = null;
 
@@ -109,6 +123,40 @@ public class DataflowWorkProgressUpdater extends 
WorkProgressUpdater {
     }
   }
 
+  /**
+   * Returns true if the class should log the HotKeyMessage. This method 
throttles logging to every
+   * 5 minutes.
+   */
+  protected boolean shouldLogHotKeyMessage(WorkItemServiceState 
workItemServiceState) {
+    String hotKeyMessage = getHotKeyMessage(workItemServiceState);
+    if (hotKeyMessage.isEmpty()) {
+      return false;
+    }
+
+    // Throttle logging the HotKeyDetection to every 5 minutes.
+    long nowMs = clock.currentTimeMillis();
+    if (nowMs - prevHotKeyDetectionLogMs < 
Duration.standardMinutes(5).getMillis()) {
+      return false;
+    }
+    prevHotKeyDetectionLogMs = nowMs;
+
+    return true;
+  }
+
+  protected String getHotKeyMessage(WorkItemServiceState workItemServiceState) 
{
+    if (workItemServiceState.getHotKeyDetection() == null
+        || workItemServiceState.getHotKeyDetection().getUserStepName() == 
null) {
+      return "";
+    }
+
+    HotKeyDetection hotKeyDetection = 
workItemServiceState.getHotKeyDetection();
+    return MessageFormat.format(
+        "A hot key was detected in step ''{0}'' with age of ''{1}''. This is"
+            + " a symptom of key distribution being skewed. To fix, please 
inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across 
your key space.",
+        hotKeyDetection.getUserStepName(), hotKeyDetection.getHotKeyAge());
+  }
+
   /** Returns the given work unit's lease expiration timestamp. */
   private long getLeaseExpirationTimestamp(WorkItem workItem) {
     return fromCloudTime(workItem.getLeaseExpireTime()).getMillis();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
index 7f87914..c52c1c9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
@@ -82,7 +82,7 @@ public abstract class WorkProgressUpdater {
   private final ScheduledExecutorService executor;
 
   /** Clock used to either provide real system time or mocked to virtualize 
time for testing. */
-  private final Clock clock;
+  protected final Clock clock;
 
   /** The lease duration to request from the external worker service. */
   protected long requestedLeaseDurationMs;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
index 648c578..8978145 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
@@ -23,6 +23,8 @@ import static 
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.clo
 import static 
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
 import static 
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.toDynamicSplitRequest;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.atLeastOnce;
@@ -32,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.api.client.testing.http.FixedClock;
+import com.google.api.services.dataflow.model.HotKeyDetection;
 import com.google.api.services.dataflow.model.Position;
 import com.google.api.services.dataflow.model.WorkItem;
 import com.google.api.services.dataflow.model.WorkItemServiceState;
@@ -54,9 +57,12 @@ import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.LoggerFactory;
 
 /** Unit tests for {@link DataflowWorkProgressUpdater}. */
 @RunWith(JUnit4.class)
+@PrepareForTest({DataflowWorkProgressUpdater.class, LoggerFactory.class})
 public class DataflowWorkProgressUpdaterTest {
 
   private static final long LEASE_MS = 2000;
@@ -64,6 +70,8 @@ public class DataflowWorkProgressUpdaterTest {
   private static final String PROJECT_ID = "TEST_PROJECT_ID";
   private static final String JOB_ID = "TEST_JOB_ID";
   private static final Long WORK_ID = 1234567890L;
+  private static final String STEP_ID = "TEST_STEP_ID";
+  private static final String HOT_KEY_AGE = "1s";
 
   @Rule public final ExpectedException thrown = ExpectedException.none();
 
@@ -94,6 +102,7 @@ public class DataflowWorkProgressUpdaterTest {
     progressUpdater =
         new DataflowWorkProgressUpdater(
             workItemStatusClient, workItem, worker, executor.getExecutor(), 
clock) {
+
           // Shorten reporting interval boundaries for faster testing.
           @Override
           protected long getMinReportingInterval() {
@@ -234,6 +243,53 @@ public class DataflowWorkProgressUpdaterTest {
     verifyNoMoreInteractions(workItemStatusClient);
   }
 
+  @Test
+  public void correctHotKeyMessage() {
+    WorkItemServiceState s = new WorkItemServiceState();
+
+    String m = progressUpdater.getHotKeyMessage(s);
+    assertTrue(m.isEmpty());
+
+    HotKeyDetection hotKeyDetection = new HotKeyDetection();
+    hotKeyDetection.setUserStepName(STEP_ID);
+    hotKeyDetection.setHotKeyAge(HOT_KEY_AGE);
+    s.setHotKeyDetection(hotKeyDetection);
+
+    m = progressUpdater.getHotKeyMessage(s);
+    assertEquals(
+        "A hot key was detected in step 'TEST_STEP_ID' with age of '1s'. This 
is a "
+            + "symptom of key distribution being skewed. To fix, please 
inspect your data and "
+            + "pipeline to ensure that elements are evenly distributed across 
your key space.",
+        m);
+  }
+
+  @Test
+  public void canLogHotKeyMessage() {
+    WorkItemServiceState s = new WorkItemServiceState();
+
+    String m = progressUpdater.getHotKeyMessage(s);
+    assertTrue(m.isEmpty());
+
+    HotKeyDetection hotKeyDetection = new HotKeyDetection();
+    hotKeyDetection.setUserStepName("step");
+    hotKeyDetection.setHotKeyAge(toCloudDuration(Duration.millis(1000)));
+    s.setHotKeyDetection(hotKeyDetection);
+
+    clock.setTime(0L);
+    assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+
+    // The class throttles every 5 minutes, so the first time it is called is 
true. The second time
+    // is throttled and returns false.
+    clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+    assertTrue(progressUpdater.shouldLogHotKeyMessage(s));
+    assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+
+    // Test that the state variable is set and can log again in 5 minutes.
+    clock.setTime(clock.currentTimeMillis() + 
Duration.standardMinutes(5L).getMillis());
+    assertTrue(progressUpdater.shouldLogHotKeyMessage(s));
+    assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+  }
+
   private WorkItemServiceState generateServiceState(
       @Nullable Position suggestedStopPosition, long millisToNextUpdate) {
     WorkItemServiceState responseState = new WorkItemServiceState();
@@ -247,6 +303,11 @@ public class DataflowWorkProgressUpdaterTest {
           
ReaderTestUtils.approximateSplitRequestAtPosition(suggestedStopPosition));
     }
 
+    HotKeyDetection hotKeyDetection = new HotKeyDetection();
+    hotKeyDetection.setUserStepName(STEP_ID);
+    hotKeyDetection.setHotKeyAge(HOT_KEY_AGE);
+    responseState.setHotKeyDetection(hotKeyDetection);
+
     return responseState;
   }
 }

Reply via email to