This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch pull/8158/head in repository https://gitbox.apache.org/repos/asf/flink.git
commit e8e24c9993badc7a2a1752721bb19ae507ae0ff4 Author: zhuzhu.zz <[email protected]> AuthorDate: Fri Apr 12 17:31:54 2019 +0800 [FLINK-12131] Adjust IntermediateResult/IntermediateResultPartition status properly on ExecutionVertex reset 1. Add a test for exact running producer counter verification 2. Add back IntermediateResult.resetForNewExecution for tests 3. Refine comments This closes #8158. --- .../runtime/executiongraph/ExecutionJobVertex.java | 5 -- .../runtime/executiongraph/ExecutionVertex.java | 5 ++ .../runtime/executiongraph/IntermediateResult.java | 12 +++- .../IntermediateResultPartition.java | 5 ++ .../runtime/executiongraph/FailoverRegionTest.java | 77 ++++++++++++++++++++++ .../IntermediateResultPartitionTest.java | 45 +++++++++++++ 6 files changed, 143 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 5a3e7f2..6b1887c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -600,11 +600,6 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable catch (Throwable t) { throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t); } - - // Reset intermediate results - for (IntermediateResult result : producedDataSets) { - result.resetForNewExecution(); - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index d94cad9..86fb0ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -634,6 +634,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi getExecutionGraph().vertexUnFinished(); } + // reset the intermediate results + for (IntermediateResultPartition resultPartition : resultPartitions.values()) { + resultPartition.resetForNewExecution(); + } + return newExecution; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index 6e1d9ba..eb316f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -154,13 +155,22 @@ public class IntermediateResult { return connectionIndex; } + @VisibleForTesting void resetForNewExecution() { - this.numberOfRunningProducers.set(numParallelProducers); for (IntermediateResultPartition partition : partitions) { partition.resetForNewExecution(); } } + @VisibleForTesting + int getNumberOfRunningProducers() { + return numberOfRunningProducers.get(); + } + + int incrementNumberOfRunningProducersAndGetRemaining() { + return numberOfRunningProducers.incrementAndGet(); + } + int decrementNumberOfRunningProducersAndGetRemaining() { return numberOfRunningProducers.decrementAndGet(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index d4e85cf..5cdd376 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -86,6 +86,11 @@ public class IntermediateResultPartition { } void resetForNewExecution() { + if (getResultType().isBlocking() && hasDataProduced) { + // A BLOCKING result partition with data produced means it is finished + // Need to add the running producer count of the result on resetting it + totalResult.incrementNumberOfRunningProducersAndGetRemaining(); + } hasDataProduced = false; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index e555704..55890d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.junit.Ignore; @@ -449,6 +450,82 @@ public class FailoverRegionTest extends TestLogger { assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev1).getState()); } + @Test + public void testStatusResettingOnRegionFailover() throws Exception { + final JobID jobId = new JobID(); + final String jobName = "Test Job Sample Name"; + + final SlotProvider slotProvider = new SimpleSlotProvider(jobId, 20); + + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + + v1.setParallelism(2); + v2.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + List<JobVertex> ordered = Arrays.asList(v1, v2); + + ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobId, + jobName), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new InfiniteDelayRestartStrategy(10), + new FailoverPipelinedRegionWithDirectExecutor(), + slotProvider); + + eg.attachJobGraph(ordered); + eg.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); + + RestartPipelinedRegionStrategy strategy = (RestartPipelinedRegionStrategy)eg.getFailoverStrategy(); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); + + // initial state + assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev21.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev22.getExecutionState()); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished()); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable()); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable()); + + // partitions all finished + ev11.getCurrentExecutionAttempt().markFinished(); + ev12.getCurrentExecutionAttempt().markFinished(); + assertEquals(ExecutionState.FINISHED, ev11.getExecutionState()); + assertEquals(ExecutionState.FINISHED, ev12.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev21.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev22.getExecutionState()); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished()); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable()); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable()); + + // force the partition producer to restart + strategy.onTaskFailure(ev11.getCurrentExecutionAttempt(), new FlinkException("Fail for testing")); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished()); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable()); + assertFalse(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable()); + + // failed partition finishes again + ev11.getCurrentExecutionAttempt().markFinished(); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].areAllPartitionsFinished()); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[0].isConsumable()); + assertTrue(eg.getJobVertex(v1.getID()).getProducedDataSets()[0].getPartitions()[1].isConsumable()); + } + // -------------------------------------------------------------------------------------------- private void verifyCheckpointRestoredAsExpected(ExecutionGraph eg) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index e20c0bd..50f3d9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -70,16 +71,60 @@ public class IntermediateResultPartitionTest extends TestLogger { partition1.markFinished(); assertFalse(partition1.isConsumable()); assertFalse(partition2.isConsumable()); + assertFalse(result.areAllPartitionsFinished()); // Consumable after all partitions are FINISHED partition2.markFinished(); assertTrue(partition1.isConsumable()); assertTrue(partition2.isConsumable()); + assertTrue(result.areAllPartitionsFinished()); // Not consumable if failover happens result.resetForNewExecution(); assertFalse(partition1.isConsumable()); assertFalse(partition2.isConsumable()); + assertFalse(result.areAllPartitionsFinished()); + } + + @Test + public void testBlockingPartitionResetting() throws Exception { + IntermediateResult result = createResult(ResultPartitionType.BLOCKING, 2); + IntermediateResultPartition partition1 = result.getPartitions()[0]; + IntermediateResultPartition partition2 = result.getPartitions()[1]; + + // Not consumable on init + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + + // Not consumable if partition1 is FINISHED + partition1.markFinished(); + assertEquals(1, result.getNumberOfRunningProducers()); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + assertFalse(result.areAllPartitionsFinished()); + + // Reset the result and mark partition2 FINISHED, the result should still not be consumable + result.resetForNewExecution(); + assertEquals(2, result.getNumberOfRunningProducers()); + partition2.markFinished(); + assertEquals(1, result.getNumberOfRunningProducers()); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + assertFalse(result.areAllPartitionsFinished()); + + // Consumable after all partitions are FINISHED + partition1.markFinished(); + assertEquals(0, result.getNumberOfRunningProducers()); + assertTrue(partition1.isConsumable()); + assertTrue(partition2.isConsumable()); + assertTrue(result.areAllPartitionsFinished()); + + // Not consumable again if failover happens + result.resetForNewExecution(); + assertEquals(2, result.getNumberOfRunningProducers()); + assertFalse(partition1.isConsumable()); + assertFalse(partition2.isConsumable()); + assertFalse(result.areAllPartitionsFinished()); } private static IntermediateResult createResult(
