Repository: flink Updated Branches: refs/heads/release-1.2 09239ea16 -> 7fbb115bc
[FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics This closes #3597. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fbb115b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fbb115b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fbb115b Branch: refs/heads/release-1.2 Commit: 7fbb115bc539cf0d63ad44c7063cf1733b94e861 Parents: 09239ea Author: Ufuk Celebi <[email protected]> Authored: Wed Mar 22 18:08:13 2017 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Mar 22 21:21:56 2017 +0100 ---------------------------------------------------------------------- .../checkpoint/CheckpointStatsTracker.java | 30 +-- .../checkpoint/CheckpointStatsTrackerTest.java | 232 +++++++++++++++++-- 2 files changed, 235 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7fbb115b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index d324c25..c7efb7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -18,6 +18,13 @@ package org.apache.flink.runtime.checkpoint; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; @@ -26,14 +33,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import javax.annotation.Nullable; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** * Tracker for checkpoint statistics. * @@ -96,6 +95,10 @@ public class CheckpointStatsTracker { */ private volatile boolean dirty; + /** The latest completed checkpoint. Used by the latest completed checkpoint metrics. */ + @Nullable + private volatile transient CompletedCheckpointStats latestCompletedCheckpoint; + /** * Creates a new checkpoint stats tracker. * @@ -241,6 +244,8 @@ public class CheckpointStatsTracker { private void reportCompletedCheckpoint(CompletedCheckpointStats completed) { statsReadWriteLock.lock(); try { + latestCompletedCheckpoint = completed; + counts.incrementCompletedCheckpoints(); history.replacePendingCheckpointById(completed); @@ -400,7 +405,7 @@ public class CheckpointStatsTracker { private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> { @Override public Long getValue() { - CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint(); + CompletedCheckpointStats completed = latestCompletedCheckpoint; if (completed != null) { return completed.getStateSize(); } else { @@ -412,7 +417,7 @@ public class CheckpointStatsTracker { private class LatestCompletedCheckpointDurationGauge implements Gauge<Long> { @Override public Long getValue() { - CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint(); + CompletedCheckpointStats completed = latestCompletedCheckpoint; if (completed != null) { return completed.getEndToEndDuration(); } else { @@ -421,11 +426,10 @@ public class CheckpointStatsTracker { } } - private class LatestCompletedCheckpointAlignmentBufferedGauge implements Gauge<Long> { @Override public Long getValue() { - CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint(); + CompletedCheckpointStats completed = latestCompletedCheckpoint;; if (completed != null) { return completed.getAlignmentBuffered(); } else { @@ -437,7 +441,7 @@ public class CheckpointStatsTracker { private class LatestCompletedCheckpointExternalPathGauge implements Gauge<String> { @Override public String getValue() { - CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint(); + CompletedCheckpointStats completed = latestCompletedCheckpoint; if (completed != null) { return completed.getExternalPath(); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/7fbb115b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 9a39182..f7696ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -18,18 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import org.junit.Test; - -import java.util.Collections; -import java.util.Iterator; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -42,6 +30,23 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.junit.Test; + public class CheckpointStatsTrackerTest { /** @@ -274,10 +279,10 @@ public class CheckpointStatsTrackerTest { } /** - * Tests the registered metrics. + * Tests the registration of the checkpoint metrics. */ @Test - public void testMetrics() throws Exception { + public void testMetricsRegistration() throws Exception { MetricGroup metricGroup = mock(MetricGroup.class); ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); @@ -304,6 +309,205 @@ public class CheckpointStatsTrackerTest { verify(metricGroup, times(9)).gauge(any(String.class), any(Gauge.class)); } + /** + * Tests that the metrics are updated properly. We had a bug that required new stats + * snapshots in order to update the metrics. + */ + @Test + @SuppressWarnings("unchecked") + public void testMetricsAreUpdated() throws Exception { + final Map<String, Gauge<?>> registeredGauges = new HashMap<>(); + + MetricGroup metricGroup = new MetricGroup() { + @Override + public Counter counter(int name) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public Counter counter(String name) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <C extends Counter> C counter(int name, C counter) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <C extends Counter> C counter(String name, C counter) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <T, G extends Gauge<T>> G gauge(int name, G gauge) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <T, G extends Gauge<T>> G gauge(String name, G gauge) { + registeredGauges.put(name, gauge); + return gauge; + } + + @Override + public <H extends Histogram> H histogram(String name, H histogram) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <H extends Histogram> H histogram(int name, H histogram) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <M extends Meter> M meter(String name, M meter) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public <M extends Meter> M meter(int name, M meter) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public MetricGroup addGroup(int name) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public MetricGroup addGroup(String name) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public String[] getScopeComponents() { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public Map<String, String> getAllVariables() { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public String getMetricIdentifier(String metricName) { + throw new UnsupportedOperationException("Not expected in this test"); + } + + @Override + public String getMetricIdentifier(String metricName, CharacterFilter filter) { + throw new UnsupportedOperationException("Not expected in this test"); + } + }; + + ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class); + when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); + when(jobVertex.getParallelism()).thenReturn(1); + + CheckpointStatsTracker stats = new CheckpointStatsTracker( + 0, + Collections.singletonList(jobVertex), + mock(JobSnapshottingSettings.class), + metricGroup); + + // Make sure to adjust this test if metrics are added/removed + assertEquals(9, registeredGauges.size()); + + // Check initial values + Gauge<Long> numCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC); + Gauge<Integer> numInProgressCheckpoints = (Gauge<Integer>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC); + Gauge<Long> numCompletedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC); + Gauge<Long> numFailedCheckpoints = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC); + Gauge<Long> latestRestoreTimestamp = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC); + Gauge<Long> latestCompletedSize = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC); + Gauge<Long> latestCompletedDuration = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC); + Gauge<Long> latestCompletedAlignmentBuffered = (Gauge<Long>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC); + Gauge<String> latestCompletedExternalPath = (Gauge<String>) registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC); + + assertEquals(Long.valueOf(0), numCheckpoints.getValue()); + assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue()); + assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue()); + assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue()); + assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue()); + assertEquals(Long.valueOf(-1), latestCompletedSize.getValue()); + assertEquals(Long.valueOf(-1), latestCompletedDuration.getValue()); + assertEquals(Long.valueOf(-1), latestCompletedAlignmentBuffered.getValue()); + assertEquals("n/a", latestCompletedExternalPath.getValue()); + + PendingCheckpointStats pending = stats.reportPendingCheckpoint( + 0, + 0, + CheckpointProperties.forStandardCheckpoint()); + + // Check counts + assertEquals(Long.valueOf(1), numCheckpoints.getValue()); + assertEquals(Integer.valueOf(1), numInProgressCheckpoints.getValue()); + assertEquals(Long.valueOf(0), numCompletedCheckpoints.getValue()); + assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue()); + + long ackTimestamp = 11231230L; + long stateSize = 12381238L; + long ignored = 0; + long alignmenetBuffered = 182812L; + String externalPath = "myexternalpath"; + + SubtaskStateStats subtaskStats = new SubtaskStateStats( + 0, + ackTimestamp, + stateSize, + ignored, + ignored, + alignmenetBuffered, + ignored); + + assertTrue(pending.reportSubtaskStats(jobVertex.getJobVertexId(), subtaskStats)); + + pending.reportCompletedCheckpoint(externalPath); + + // Verify completed checkpoint updated + assertEquals(Long.valueOf(1), numCheckpoints.getValue()); + assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue()); + assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue()); + assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue()); + assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue()); + assertEquals(Long.valueOf(stateSize), latestCompletedSize.getValue()); + assertEquals(Long.valueOf(ackTimestamp), latestCompletedDuration.getValue()); + assertEquals(Long.valueOf(alignmenetBuffered), latestCompletedAlignmentBuffered.getValue()); + assertEquals(externalPath, latestCompletedExternalPath.getValue()); + + // Check failed + PendingCheckpointStats nextPending = stats.reportPendingCheckpoint( + 1, + 11, + CheckpointProperties.forStandardCheckpoint()); + + long failureTimestamp = 1230123L; + nextPending.reportFailedCheckpoint(failureTimestamp, null); + + // Verify updated + assertEquals(Long.valueOf(2), numCheckpoints.getValue()); + assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue()); + assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue()); + assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue()); // one failed now + + // Check restore + long restoreTimestamp = 183419283L; + RestoredCheckpointStats restored = new RestoredCheckpointStats( + 1, + CheckpointProperties.forStandardCheckpoint(), + restoreTimestamp, + null); + stats.reportRestoredCheckpoint(restored); + + assertEquals(Long.valueOf(2), numCheckpoints.getValue()); + assertEquals(Integer.valueOf(0), numInProgressCheckpoints.getValue()); + assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue()); + assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue()); + + assertEquals(Long.valueOf(restoreTimestamp), latestRestoreTimestamp.getValue()); + } + // ------------------------------------------------------------------------ /**
