Repository: flink
Updated Branches:
  refs/heads/release-1.2 09239ea16 -> 7fbb115bc


[FLINK-6170] [metrics] Don't rely on stats snapshot for checkpoint metrics

This closes #3597.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fbb115b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fbb115b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fbb115b

Branch: refs/heads/release-1.2
Commit: 7fbb115bc539cf0d63ad44c7063cf1733b94e861
Parents: 09239ea
Author: Ufuk Celebi <[email protected]>
Authored: Wed Mar 22 18:08:13 2017 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Mar 22 21:21:56 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointStatsTracker.java      |  30 +--
 .../checkpoint/CheckpointStatsTrackerTest.java  | 232 +++++++++++++++++--
 2 files changed, 235 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fbb115b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index d324c25..c7efb7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
@@ -26,14 +33,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 
-import javax.annotation.Nullable;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Tracker for checkpoint statistics.
  *
@@ -96,6 +95,10 @@ public class CheckpointStatsTracker {
         */
        private volatile boolean dirty;
 
+       /** The latest completed checkpoint. Used by the latest completed 
checkpoint metrics. */
+       @Nullable
+       private volatile transient CompletedCheckpointStats 
latestCompletedCheckpoint;
+
        /**
         * Creates a new checkpoint stats tracker.
         *
@@ -241,6 +244,8 @@ public class CheckpointStatsTracker {
        private void reportCompletedCheckpoint(CompletedCheckpointStats 
completed) {
                statsReadWriteLock.lock();
                try {
+                       latestCompletedCheckpoint = completed;
+
                        counts.incrementCompletedCheckpoints();
                        history.replacePendingCheckpointById(completed);
 
@@ -400,7 +405,7 @@ public class CheckpointStatsTracker {
        private class LatestCompletedCheckpointSizeGauge implements Gauge<Long> 
{
                @Override
                public Long getValue() {
-                       CompletedCheckpointStats completed = 
latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+                       CompletedCheckpointStats completed = 
latestCompletedCheckpoint;
                        if (completed != null) {
                                return completed.getStateSize();
                        } else {
@@ -412,7 +417,7 @@ public class CheckpointStatsTracker {
        private class LatestCompletedCheckpointDurationGauge implements 
Gauge<Long> {
                @Override
                public Long getValue() {
-                       CompletedCheckpointStats completed = 
latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+                       CompletedCheckpointStats completed = 
latestCompletedCheckpoint;
                        if (completed != null) {
                                return completed.getEndToEndDuration();
                        } else {
@@ -421,11 +426,10 @@ public class CheckpointStatsTracker {
                }
        }
 
-
        private class LatestCompletedCheckpointAlignmentBufferedGauge 
implements Gauge<Long> {
                @Override
                public Long getValue() {
-                       CompletedCheckpointStats completed = 
latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+                       CompletedCheckpointStats completed = 
latestCompletedCheckpoint;;
                        if (completed != null) {
                                return completed.getAlignmentBuffered();
                        } else {
@@ -437,7 +441,7 @@ public class CheckpointStatsTracker {
        private class LatestCompletedCheckpointExternalPathGauge implements 
Gauge<String> {
                @Override
                public String getValue() {
-                       CompletedCheckpointStats completed = 
latestSnapshot.getHistory().getLatestCompletedCheckpoint();
+                       CompletedCheckpointStats completed = 
latestCompletedCheckpoint;
                        if (completed != null) {
                                return completed.getExternalPath();
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fbb115b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 9a39182..f7696ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.Iterator;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -42,6 +30,23 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.junit.Test;
+
 public class CheckpointStatsTrackerTest {
 
        /**
@@ -274,10 +279,10 @@ public class CheckpointStatsTrackerTest {
        }
 
        /**
-        * Tests the registered metrics.
+        * Tests the registration of the checkpoint metrics.
         */
        @Test
-       public void testMetrics() throws Exception {
+       public void testMetricsRegistration() throws Exception {
                MetricGroup metricGroup = mock(MetricGroup.class);
 
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
@@ -304,6 +309,205 @@ public class CheckpointStatsTrackerTest {
                verify(metricGroup, times(9)).gauge(any(String.class), 
any(Gauge.class));
        }
 
+       /**
+        * Tests that the metrics are updated properly. We had a bug that 
required new stats
+        * snapshots in order to update the metrics.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testMetricsAreUpdated() throws Exception {
+               final Map<String, Gauge<?>> registeredGauges = new HashMap<>();
+
+               MetricGroup metricGroup = new MetricGroup() {
+                       @Override
+                       public Counter counter(int name) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public Counter counter(String name) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <C extends Counter> C counter(int name, C 
counter) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <C extends Counter> C counter(String name, C 
counter) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <T, G extends Gauge<T>> G gauge(int name, G 
gauge) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <T, G extends Gauge<T>> G gauge(String name, G 
gauge) {
+                               registeredGauges.put(name, gauge);
+                               return gauge;
+                       }
+
+                       @Override
+                       public <H extends Histogram> H histogram(String name, H 
histogram) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <H extends Histogram> H histogram(int name, H 
histogram) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <M extends Meter> M meter(String name, M meter) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public <M extends Meter> M meter(int name, M meter) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public MetricGroup addGroup(int name) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public MetricGroup addGroup(String name) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public String[] getScopeComponents() {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public Map<String, String> getAllVariables() {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public String getMetricIdentifier(String metricName) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+
+                       @Override
+                       public String getMetricIdentifier(String metricName, 
CharacterFilter filter) {
+                               throw new UnsupportedOperationException("Not 
expected in this test");
+                       }
+               };
+
+               ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+               when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+               when(jobVertex.getParallelism()).thenReturn(1);
+
+               CheckpointStatsTracker stats = new CheckpointStatsTracker(
+                       0,
+                       Collections.singletonList(jobVertex),
+                       mock(JobSnapshottingSettings.class),
+                       metricGroup);
+
+               // Make sure to adjust this test if metrics are added/removed
+               assertEquals(9, registeredGauges.size());
+
+               // Check initial values
+               Gauge<Long> numCheckpoints = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC);
+               Gauge<Integer> numInProgressCheckpoints = (Gauge<Integer>) 
registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC);
+               Gauge<Long> numCompletedCheckpoints = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC);
+               Gauge<Long> numFailedCheckpoints = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC);
+               Gauge<Long> latestRestoreTimestamp = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC);
+               Gauge<Long> latestCompletedSize = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
+               Gauge<Long> latestCompletedDuration = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC);
+               Gauge<Long> latestCompletedAlignmentBuffered = (Gauge<Long>) 
registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_ALIGNMENT_BUFFERED_METRIC);
+               Gauge<String> latestCompletedExternalPath = (Gauge<String>) 
registeredGauges.get(CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC);
+
+               assertEquals(Long.valueOf(0), numCheckpoints.getValue());
+               assertEquals(Integer.valueOf(0), 
numInProgressCheckpoints.getValue());
+               assertEquals(Long.valueOf(0), 
numCompletedCheckpoints.getValue());
+               assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+               assertEquals(Long.valueOf(-1), 
latestRestoreTimestamp.getValue());
+               assertEquals(Long.valueOf(-1), latestCompletedSize.getValue());
+               assertEquals(Long.valueOf(-1), 
latestCompletedDuration.getValue());
+               assertEquals(Long.valueOf(-1), 
latestCompletedAlignmentBuffered.getValue());
+               assertEquals("n/a", latestCompletedExternalPath.getValue());
+
+               PendingCheckpointStats pending = stats.reportPendingCheckpoint(
+                       0,
+                       0,
+                       CheckpointProperties.forStandardCheckpoint());
+
+               // Check counts
+               assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+               assertEquals(Integer.valueOf(1), 
numInProgressCheckpoints.getValue());
+               assertEquals(Long.valueOf(0), 
numCompletedCheckpoints.getValue());
+               assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+
+               long ackTimestamp = 11231230L;
+               long stateSize = 12381238L;
+               long ignored = 0;
+               long alignmenetBuffered = 182812L;
+               String externalPath = "myexternalpath";
+
+               SubtaskStateStats subtaskStats = new SubtaskStateStats(
+                       0,
+                       ackTimestamp,
+                       stateSize,
+                       ignored,
+                       ignored,
+                       alignmenetBuffered,
+                       ignored);
+
+               
assertTrue(pending.reportSubtaskStats(jobVertex.getJobVertexId(), 
subtaskStats));
+
+               pending.reportCompletedCheckpoint(externalPath);
+
+               // Verify completed checkpoint updated
+               assertEquals(Long.valueOf(1), numCheckpoints.getValue());
+               assertEquals(Integer.valueOf(0), 
numInProgressCheckpoints.getValue());
+               assertEquals(Long.valueOf(1), 
numCompletedCheckpoints.getValue());
+               assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
+               assertEquals(Long.valueOf(-1), 
latestRestoreTimestamp.getValue());
+               assertEquals(Long.valueOf(stateSize), 
latestCompletedSize.getValue());
+               assertEquals(Long.valueOf(ackTimestamp), 
latestCompletedDuration.getValue());
+               assertEquals(Long.valueOf(alignmenetBuffered), 
latestCompletedAlignmentBuffered.getValue());
+               assertEquals(externalPath, 
latestCompletedExternalPath.getValue());
+
+               // Check failed
+               PendingCheckpointStats nextPending = 
stats.reportPendingCheckpoint(
+                       1,
+                       11,
+                       CheckpointProperties.forStandardCheckpoint());
+
+               long failureTimestamp = 1230123L;
+               nextPending.reportFailedCheckpoint(failureTimestamp, null);
+
+               // Verify updated
+               assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+               assertEquals(Integer.valueOf(0), 
numInProgressCheckpoints.getValue());
+               assertEquals(Long.valueOf(1), 
numCompletedCheckpoints.getValue());
+               assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue()); 
// one failed now
+
+               // Check restore
+               long restoreTimestamp = 183419283L;
+               RestoredCheckpointStats restored = new RestoredCheckpointStats(
+                       1,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       restoreTimestamp,
+                       null);
+               stats.reportRestoredCheckpoint(restored);
+
+               assertEquals(Long.valueOf(2), numCheckpoints.getValue());
+               assertEquals(Integer.valueOf(0), 
numInProgressCheckpoints.getValue());
+               assertEquals(Long.valueOf(1), 
numCompletedCheckpoints.getValue());
+               assertEquals(Long.valueOf(1), numFailedCheckpoints.getValue());
+
+               assertEquals(Long.valueOf(restoreTimestamp), 
latestRestoreTimestamp.getValue());
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

Reply via email to