This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch pull/8158/head
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e8e24c9993badc7a2a1752721bb19ae507ae0ff4
Author: zhuzhu.zz <[email protected]>
AuthorDate: Fri Apr 12 17:31:54 2019 +0800

    [FLINK-12131] Adjust IntermediateResult/IntermediateResultPartition status 
properly on ExecutionVertex reset
    
    1. Add a test for exact running producer counter verification
    2. Add back IntermediateResult.resetForNewExecution for tests
    3. Refine comments
    
    This closes #8158.
---
 .../runtime/executiongraph/ExecutionJobVertex.java |  5 --
 .../runtime/executiongraph/ExecutionVertex.java    |  5 ++
 .../runtime/executiongraph/IntermediateResult.java | 12 +++-
 .../IntermediateResultPartition.java               |  5 ++
 .../runtime/executiongraph/FailoverRegionTest.java | 77 ++++++++++++++++++++++
 .../IntermediateResultPartitionTest.java           | 45 +++++++++++++
 6 files changed, 143 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 5a3e7f2..6b1887c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -600,11 +600,6 @@ public class ExecutionJobVertex implements 
AccessExecutionJobVertex, Archiveable
                        catch (Throwable t) {
                                throw new RuntimeException("Re-creating the 
input split assigner failed: " + t.getMessage(), t);
                        }
-
-                       // Reset intermediate results
-                       for (IntermediateResult result : producedDataSets) {
-                               result.resetForNewExecution();
-                       }
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d94cad9..86fb0ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -634,6 +634,11 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                                        getExecutionGraph().vertexUnFinished();
                                }
 
+                               // reset the intermediate results
+                               for (IntermediateResultPartition 
resultPartition : resultPartitions.values()) {
+                                       resultPartition.resetForNewExecution();
+                               }
+
                                return newExecution;
                        }
                        else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 6e1d9ba..eb316f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -154,13 +155,22 @@ public class IntermediateResult {
                return connectionIndex;
        }
 
+       @VisibleForTesting
        void resetForNewExecution() {
-               this.numberOfRunningProducers.set(numParallelProducers);
                for (IntermediateResultPartition partition : partitions) {
                        partition.resetForNewExecution();
                }
        }
 
+       @VisibleForTesting
+       int getNumberOfRunningProducers() {
+               return numberOfRunningProducers.get();
+       }
+
+       int incrementNumberOfRunningProducersAndGetRemaining() {
+               return numberOfRunningProducers.incrementAndGet();
+       }
+
        int decrementNumberOfRunningProducersAndGetRemaining() {
                return numberOfRunningProducers.decrementAndGet();
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index d4e85cf..5cdd376 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -86,6 +86,11 @@ public class IntermediateResultPartition {
        }
 
        void resetForNewExecution() {
+               if (getResultType().isBlocking() && hasDataProduced) {
+                       // A BLOCKING result partition with data produced means 
it is finished
+                       // Need to add the running producer count of the result 
on resetting it
+                       
totalResult.incrementNumberOfRunningProducersAndGetRemaining();
+               }
                hasDataProduced = false;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index e555704..55890d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Ignore;
@@ -449,6 +450,82 @@ public class FailoverRegionTest extends TestLogger {
                assertEquals(JobStatus.CANCELLING, 
strategy.getFailoverRegion(ev1).getState());
        }
 
+       @Test
+       public void testStatusResettingOnRegionFailover() throws Exception {
+               final JobID jobId = new JobID();
+               final String jobName = "Test Job Sample Name";
+
+               final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 
20);
+
+               JobVertex v1 = new JobVertex("vertex1");
+               JobVertex v2 = new JobVertex("vertex2");
+
+               v1.setParallelism(2);
+               v2.setParallelism(2);
+
+               v1.setInvokableClass(AbstractInvokable.class);
+               v2.setInvokableClass(AbstractInvokable.class);
+
+               v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+
+               List<JobVertex> ordered = Arrays.asList(v1, v2);
+
+               ExecutionGraph eg = new ExecutionGraph(
+                       new DummyJobInformation(
+                               jobId,
+                               jobName),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       AkkaUtils.getDefaultTimeout(),
+                       new InfiniteDelayRestartStrategy(10),
+                       new FailoverPipelinedRegionWithDirectExecutor(),
+                       slotProvider);
+
+               eg.attachJobGraph(ordered);
+               
eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               RestartPipelinedRegionStrategy strategy = 
(RestartPipelinedRegionStrategy)eg.getFailoverStrategy();
+
+               ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+               ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+               ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+               ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+
+               eg.scheduleForExecution();
+
+               // initial state
+               assertEquals(ExecutionState.DEPLOYING, 
ev11.getExecutionState());
+               assertEquals(ExecutionState.DEPLOYING, 
ev12.getExecutionState());
+               assertEquals(ExecutionState.CREATED, ev21.getExecutionState());
+               assertEquals(ExecutionState.CREATED, ev22.getExecutionState());
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished());
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable());
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable());
+
+               // partitions all finished
+               ev11.getCurrentExecutionAttempt().markFinished();
+               ev12.getCurrentExecutionAttempt().markFinished();
+               assertEquals(ExecutionState.FINISHED, ev11.getExecutionState());
+               assertEquals(ExecutionState.FINISHED, ev12.getExecutionState());
+               assertEquals(ExecutionState.DEPLOYING, 
ev21.getExecutionState());
+               assertEquals(ExecutionState.DEPLOYING, 
ev22.getExecutionState());
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished());
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable());
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable());
+
+               // force the partition producer to restart
+               strategy.onTaskFailure(ev11.getCurrentExecutionAttempt(), new 
FlinkException("Fail for testing"));
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished());
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable());
+               
assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable());
+
+               // failed partition finishes again
+               ev11.getCurrentExecutionAttempt().markFinished();
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished());
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable());
+               
assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable());
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private void verifyCheckpointRestoredAsExpected(ExecutionGraph eg) 
throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
index e20c0bd..50f3d9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -70,16 +71,60 @@ public class IntermediateResultPartitionTest extends 
TestLogger {
                partition1.markFinished();
                assertFalse(partition1.isConsumable());
                assertFalse(partition2.isConsumable());
+               assertFalse(result.areAllPartitionsFinished());
 
                // Consumable after all partitions are FINISHED
                partition2.markFinished();
                assertTrue(partition1.isConsumable());
                assertTrue(partition2.isConsumable());
+               assertTrue(result.areAllPartitionsFinished());
 
                // Not consumable if failover happens
                result.resetForNewExecution();
                assertFalse(partition1.isConsumable());
                assertFalse(partition2.isConsumable());
+               assertFalse(result.areAllPartitionsFinished());
+       }
+
+       @Test
+       public void testBlockingPartitionResetting() throws Exception {
+               IntermediateResult result = 
createResult(ResultPartitionType.BLOCKING, 2);
+               IntermediateResultPartition partition1 = 
result.getPartitions()[0];
+               IntermediateResultPartition partition2 = 
result.getPartitions()[1];
+
+               // Not consumable on init
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+
+               // Not consumable if partition1 is FINISHED
+               partition1.markFinished();
+               assertEquals(1,  result.getNumberOfRunningProducers());
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+               assertFalse(result.areAllPartitionsFinished());
+
+               // Reset the result and mark partition2 FINISHED, the result 
should still not be consumable
+               result.resetForNewExecution();
+               assertEquals(2,  result.getNumberOfRunningProducers());
+               partition2.markFinished();
+               assertEquals(1,  result.getNumberOfRunningProducers());
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+               assertFalse(result.areAllPartitionsFinished());
+
+               // Consumable after all partitions are FINISHED
+               partition1.markFinished();
+               assertEquals(0,  result.getNumberOfRunningProducers());
+               assertTrue(partition1.isConsumable());
+               assertTrue(partition2.isConsumable());
+               assertTrue(result.areAllPartitionsFinished());
+
+               // Not consumable again if failover happens
+               result.resetForNewExecution();
+               assertEquals(2,  result.getNumberOfRunningProducers());
+               assertFalse(partition1.isConsumable());
+               assertFalse(partition2.isConsumable());
+               assertFalse(result.areAllPartitionsFinished());
        }
 
        private static IntermediateResult createResult(

Reply via email to