This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e378f13 [BEAM-7820] Add hot key detection to Batch Dataflow Worker
(#9186)
e378f13 is described below
commit e378f1357f3a22c521804fc42a30387eee9d15b1
Author: Sam sam <[email protected]>
AuthorDate: Tue Jul 30 14:52:31 2019 -0700
[BEAM-7820] Add hot key detection to Batch Dataflow Worker (#9186)
* Add basic hot key detection logging in Worker, will add throttling next
* spotless
* only log when valid
* Add unit test
* Add throttling mechanism
* Add comment
* spotless
* add back visible for testing
---
.../worker/DataflowWorkProgressUpdater.java | 48 +++++++++++++++++
.../util/common/worker/WorkProgressUpdater.java | 2 +-
.../worker/DataflowWorkProgressUpdaterTest.java | 61 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
index becfed7..be2272a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java
@@ -22,8 +22,10 @@ import static
org.apache.beam.runners.dataflow.util.TimeUtil.fromCloudTime;
import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
+import com.google.api.services.dataflow.model.HotKeyDetection;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
+import java.text.MessageFormat;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
@@ -39,6 +41,7 @@ import org.slf4j.LoggerFactory;
*/
@NotThreadSafe
public class DataflowWorkProgressUpdater extends WorkProgressUpdater {
+
private static final Logger LOG =
LoggerFactory.getLogger(DataflowWorkProgressUpdater.class);
private final WorkItemStatusClient workItemStatusClient;
@@ -46,6 +49,12 @@ public class DataflowWorkProgressUpdater extends
WorkProgressUpdater {
/** The WorkItem for which work progress updates are sent. */
private final WorkItem workItem;
+ /**
+ * The previous time the HotKeyDetection was logged. This is used to
throttle logging to every 5
+ * minutes.
+ */
+ private long prevHotKeyDetectionLogMs = 0;
+
public DataflowWorkProgressUpdater(
WorkItemStatusClient workItemStatusClient, WorkItem workItem,
WorkExecutor worker) {
super(worker, Integer.MAX_VALUE);
@@ -90,7 +99,12 @@ public class DataflowWorkProgressUpdater extends
WorkProgressUpdater {
WorkItemServiceState result =
workItemStatusClient.reportUpdate(
dynamicSplitResultToReport,
Duration.millis(requestedLeaseDurationMs));
+
if (result != null) {
+ if (shouldLogHotKeyMessage(result)) {
+ LOG.warn(getHotKeyMessage(result));
+ }
+
// Resets state after a successful progress report.
dynamicSplitResultToReport = null;
@@ -109,6 +123,40 @@ public class DataflowWorkProgressUpdater extends
WorkProgressUpdater {
}
}
+ /**
+ * Returns true if the class should log the HotKeyMessage. This method
throttles logging to every
+ * 5 minutes.
+ */
+ protected boolean shouldLogHotKeyMessage(WorkItemServiceState
workItemServiceState) {
+ String hotKeyMessage = getHotKeyMessage(workItemServiceState);
+ if (hotKeyMessage.isEmpty()) {
+ return false;
+ }
+
+ // Throttle logging the HotKeyDetection to every 5 minutes.
+ long nowMs = clock.currentTimeMillis();
+ if (nowMs - prevHotKeyDetectionLogMs <
Duration.standardMinutes(5).getMillis()) {
+ return false;
+ }
+ prevHotKeyDetectionLogMs = nowMs;
+
+ return true;
+ }
+
+ protected String getHotKeyMessage(WorkItemServiceState workItemServiceState)
{
+ if (workItemServiceState.getHotKeyDetection() == null
+ || workItemServiceState.getHotKeyDetection().getUserStepName() ==
null) {
+ return "";
+ }
+
+ HotKeyDetection hotKeyDetection =
workItemServiceState.getHotKeyDetection();
+ return MessageFormat.format(
+ "A hot key was detected in step ''{0}'' with age of ''{1}''. This is"
+ + " a symptom of key distribution being skewed. To fix, please
inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across
your key space.",
+ hotKeyDetection.getUserStepName(), hotKeyDetection.getHotKeyAge());
+ }
+
/** Returns the given work unit's lease expiration timestamp. */
private long getLeaseExpirationTimestamp(WorkItem workItem) {
return fromCloudTime(workItem.getLeaseExpireTime()).getMillis();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
index 7f87914..c52c1c9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/WorkProgressUpdater.java
@@ -82,7 +82,7 @@ public abstract class WorkProgressUpdater {
private final ScheduledExecutorService executor;
/** Clock used to either provide real system time or mocked to virtualize
time for testing. */
- private final Clock clock;
+ protected final Clock clock;
/** The lease duration to request from the external worker service. */
protected long requestedLeaseDurationMs;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
index 648c578..8978145 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdaterTest.java
@@ -23,6 +23,8 @@ import static
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.clo
import static
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.cloudProgressToReaderProgress;
import static
org.apache.beam.runners.dataflow.worker.SourceTranslationUtils.toDynamicSplitRequest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.atLeastOnce;
@@ -32,6 +34,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.api.client.testing.http.FixedClock;
+import com.google.api.services.dataflow.model.HotKeyDetection;
import com.google.api.services.dataflow.model.Position;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
@@ -54,9 +57,12 @@ import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.LoggerFactory;
/** Unit tests for {@link DataflowWorkProgressUpdater}. */
@RunWith(JUnit4.class)
+@PrepareForTest({DataflowWorkProgressUpdater.class, LoggerFactory.class})
public class DataflowWorkProgressUpdaterTest {
private static final long LEASE_MS = 2000;
@@ -64,6 +70,8 @@ public class DataflowWorkProgressUpdaterTest {
private static final String PROJECT_ID = "TEST_PROJECT_ID";
private static final String JOB_ID = "TEST_JOB_ID";
private static final Long WORK_ID = 1234567890L;
+ private static final String STEP_ID = "TEST_STEP_ID";
+ private static final String HOT_KEY_AGE = "1s";
@Rule public final ExpectedException thrown = ExpectedException.none();
@@ -94,6 +102,7 @@ public class DataflowWorkProgressUpdaterTest {
progressUpdater =
new DataflowWorkProgressUpdater(
workItemStatusClient, workItem, worker, executor.getExecutor(),
clock) {
+
// Shorten reporting interval boundaries for faster testing.
@Override
protected long getMinReportingInterval() {
@@ -234,6 +243,53 @@ public class DataflowWorkProgressUpdaterTest {
verifyNoMoreInteractions(workItemStatusClient);
}
+ @Test
+ public void correctHotKeyMessage() {
+ WorkItemServiceState s = new WorkItemServiceState();
+
+ String m = progressUpdater.getHotKeyMessage(s);
+ assertTrue(m.isEmpty());
+
+ HotKeyDetection hotKeyDetection = new HotKeyDetection();
+ hotKeyDetection.setUserStepName(STEP_ID);
+ hotKeyDetection.setHotKeyAge(HOT_KEY_AGE);
+ s.setHotKeyDetection(hotKeyDetection);
+
+ m = progressUpdater.getHotKeyMessage(s);
+ assertEquals(
+ "A hot key was detected in step 'TEST_STEP_ID' with age of '1s'. This
is a "
+ + "symptom of key distribution being skewed. To fix, please
inspect your data and "
+ + "pipeline to ensure that elements are evenly distributed across
your key space.",
+ m);
+ }
+
+ @Test
+ public void canLogHotKeyMessage() {
+ WorkItemServiceState s = new WorkItemServiceState();
+
+ String m = progressUpdater.getHotKeyMessage(s);
+ assertTrue(m.isEmpty());
+
+ HotKeyDetection hotKeyDetection = new HotKeyDetection();
+ hotKeyDetection.setUserStepName("step");
+ hotKeyDetection.setHotKeyAge(toCloudDuration(Duration.millis(1000)));
+ s.setHotKeyDetection(hotKeyDetection);
+
+ clock.setTime(0L);
+ assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+
+ // The class throttles every 5 minutes, so the first time it is called is
true. The second time
+ // is throttled and returns false.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
+ assertTrue(progressUpdater.shouldLogHotKeyMessage(s));
+ assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+
+ // Test that the state variable is set and can log again in 5 minutes.
+ clock.setTime(clock.currentTimeMillis() +
Duration.standardMinutes(5L).getMillis());
+ assertTrue(progressUpdater.shouldLogHotKeyMessage(s));
+ assertFalse(progressUpdater.shouldLogHotKeyMessage(s));
+ }
+
private WorkItemServiceState generateServiceState(
@Nullable Position suggestedStopPosition, long millisToNextUpdate) {
WorkItemServiceState responseState = new WorkItemServiceState();
@@ -247,6 +303,11 @@ public class DataflowWorkProgressUpdaterTest {
ReaderTestUtils.approximateSplitRequestAtPosition(suggestedStopPosition));
}
+ HotKeyDetection hotKeyDetection = new HotKeyDetection();
+ hotKeyDetection.setUserStepName(STEP_ID);
+ hotKeyDetection.setHotKeyAge(HOT_KEY_AGE);
+ responseState.setHotKeyDetection(hotKeyDetection);
+
return responseState;
}
}