[
https://issues.apache.org/jira/browse/FLINK-10056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580784#comment-16580784
]
ASF GitHub Bot commented on FLINK-10056:
----------------------------------------
asfgit closed pull request #6490: [FLINK-10056] [test] Add
JobMasterTest#testRequestNextInputSplit
URL: https://github.com/apache/flink/pull/6490
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c47f4fd19ff..01cb2b6b099 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1655,4 +1655,9 @@ public void reportPayload(ResourceID resourceID, Void
payload) {
RestartStrategy getRestartStrategy() {
return restartStrategy;
}
+
+ @VisibleForTesting
+ ExecutionGraph getExecutionGraph() {
+ return executionGraph;
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 1d36fa5859a..0d603fc17b2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -97,13 +97,12 @@
//
------------------------------------------------------------------------
/**
- * Waits until the job has reached a certain state.
+ * Waits until the Job has reached a certain state.
*
* <p>This method is based on polling and might miss very fast state
transitions!
*/
public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus
status, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(eg);
checkNotNull(status);
checkArgument(maxWaitMillis >= 0);
@@ -118,7 +117,9 @@ public static void waitUntilJobStatus(ExecutionGraph eg,
JobStatus status, long
}
if (System.nanoTime() >= deadline) {
- throw new TimeoutException("The job did not reach
status " + status + " in time. Current status is " + eg.getState() + '.');
+ throw new TimeoutException(
+ String.format("The job did not reach status %s
in time. Current status is %s.",
+ status, eg.getState()));
}
}
@@ -129,7 +130,6 @@ public static void waitUntilJobStatus(ExecutionGraph eg,
JobStatus status, long
*/
public static void waitUntilExecutionState(Execution execution,
ExecutionState state, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(execution);
checkNotNull(state);
checkArgument(maxWaitMillis >= 0);
@@ -144,7 +144,47 @@ public static void waitUntilExecutionState(Execution
execution, ExecutionState s
}
if (System.nanoTime() >= deadline) {
- throw new TimeoutException();
+ throw new TimeoutException(
+ String.format("The execution did not reach
state %s in time. Current state is %s.",
+ state, execution.getState()));
+ }
+ }
+
+ /**
+ * Waits until the ExecutionVertex has reached a certain state.
+ *
+ * <p>This method is based on polling and might miss very fast state
transitions!
+ */
+ public static void waitUntilExecutionVertexState(ExecutionVertex
executionVertex, ExecutionState state, long maxWaitMillis)
+ throws TimeoutException {
+ checkNotNull(executionVertex);
+ checkNotNull(state);
+ checkArgument(maxWaitMillis >= 0);
+
+ // this is a poor implementation - we may want to improve it
eventually
+ final long deadline = maxWaitMillis == 0 ? Long.MAX_VALUE :
System.nanoTime() + (maxWaitMillis * 1_000_000);
+
+ while (true) {
+ Execution execution =
executionVertex.getCurrentExecutionAttempt();
+
+ if (execution == null || (execution.getState() != state
&& System.nanoTime() < deadline)) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException ignored) { }
+ } else {
+ break;
+ }
+
+ if (System.nanoTime() >= deadline) {
+ if (execution != null) {
+ throw new TimeoutException(
+ String.format("The execution
vertex did not reach state %s in time. Current state is %s.",
+ state,
execution.getState()));
+ } else {
+ throw new TimeoutException(
+ "Cannot get current execution
attempt of " + executionVertex + '.');
+ }
+ }
}
}
@@ -201,7 +241,6 @@ public static void waitForAllExecutionsPredicate(
public static void waitUntilFailoverRegionState(FailoverRegion region,
JobStatus status, long maxWaitMillis)
throws TimeoutException {
-
checkNotNull(region);
checkNotNull(status);
checkArgument(maxWaitMillis >= 0);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 891ff82c413..578c9066e95 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -24,8 +24,12 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
@@ -46,6 +50,9 @@
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -60,6 +67,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -84,6 +92,7 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
@@ -113,6 +122,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -678,6 +688,133 @@ public void
testResourceManagerConnectionAfterRegainingLeadership() throws Excep
}
}
+ @Test
+ public void testRequestNextInputSplit() throws Exception {
+ // build one node JobGraph
+ InputSplitSource<TestingInputSplit> inputSplitSource = new
TestingInputSplitSource();
+
+ JobVertex source = new JobVertex("vertex1");
+ source.setParallelism(1);
+ source.setInputSplitSource(inputSplitSource);
+ source.setInvokableClass(AbstractInvokable.class);
+
+ final JobGraph jobGraph = new JobGraph(source);
+ jobGraph.setAllowQueuedScheduling(true);
+
+
configuration.setLong(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+
configuration.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0
s");
+
+ final JobManagerSharedServices jobManagerSharedServices =
+ new TestingJobManagerSharedServicesBuilder()
+
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+ .build();
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ jobGraph,
+ haServices,
+ jobManagerSharedServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ ExecutionGraph eg = jobMaster.getExecutionGraph();
+ ExecutionVertex ev =
eg.getAllExecutionVertices().iterator().next();
+
+ SerializedInputSplit serializedInputSplit1 =
jobMasterGateway
+ .requestNextInputSplit(
+ source.getID(),
+
ev.getCurrentExecutionAttempt().getAttemptId())
+ .get(1L, TimeUnit.SECONDS);
+ InputSplit inputSplit1 = InstantiationUtil
+ .deserializeObject(
+
serializedInputSplit1.getInputSplitData(),
+ ClassLoader.getSystemClassLoader());
+ assertEquals(0, inputSplit1.getSplitNumber());
+
+ SerializedInputSplit serializedInputSplit2 =
jobMasterGateway
+ .requestNextInputSplit(
+ source.getID(),
+
ev.getCurrentExecutionAttempt().getAttemptId())
+ .get(1L, TimeUnit.SECONDS);
+ InputSplit inputSplit2 = InstantiationUtil
+ .deserializeObject(
+
serializedInputSplit2.getInputSplitData(),
+ ClassLoader.getSystemClassLoader());
+ assertEquals(1, inputSplit2.getSplitNumber());
+
+
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, 2000L);
+
+ eg.failGlobal(new Exception("Testing exception"));
+
+
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, 2000L);
+
+ SerializedInputSplit serializedInputSplit3 =
jobMasterGateway
+ .requestNextInputSplit(
+ source.getID(),
+
ev.getCurrentExecutionAttempt().getAttemptId())
+ .get(1L, TimeUnit.SECONDS);
+ InputSplit inputSplit3 = InstantiationUtil
+ .deserializeObject(
+
serializedInputSplit3.getInputSplitData(),
+ ClassLoader.getSystemClassLoader());
+ assertEquals(0, inputSplit3.getSplitNumber());
+
+ SerializedInputSplit serializedInputSplit4 =
jobMasterGateway
+ .requestNextInputSplit(
+ source.getID(),
+
ev.getCurrentExecutionAttempt().getAttemptId())
+ .get(1L, TimeUnit.SECONDS);
+ InputSplit inputSplit4 = InstantiationUtil
+ .deserializeObject(
+
serializedInputSplit4.getInputSplitData(),
+ ClassLoader.getSystemClassLoader());
+ assertEquals(1, inputSplit4.getSplitNumber());
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
+ private static final class TestingInputSplitSource implements
InputSplitSource<TestingInputSplit> {
+ @Override
+ public TestingInputSplit[] createInputSplits(int minNumSplits) {
+ return new TestingInputSplit[0];
+ }
+
+ @Override
+ public InputSplitAssigner
getInputSplitAssigner(TestingInputSplit[] inputSplits) {
+ return new TestingInputSplitAssigner();
+ }
+ }
+
+ private static final class TestingInputSplitAssigner implements
InputSplitAssigner {
+
+ private int splitIndex = 0;
+
+ @Override
+ public InputSplit getNextInputSplit(String host, int taskId){
+ return new TestingInputSplit(splitIndex++);
+ }
+ }
+
+ private static final class TestingInputSplit implements InputSplit {
+
+ private final int splitNumber;
+
+ TestingInputSplit(int number) {
+ this.splitNumber = number;
+ }
+
+ public int getSplitNumber() {
+ return splitNumber;
+ }
+ }
+
/**
* Tests the {@link
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
* call for a finished result partition.
@@ -708,9 +845,9 @@ public void testRequestPartitionState() throws Exception {
final CompletableFuture<TaskDeploymentDescriptor>
tddFuture = new CompletableFuture<>();
final TestingTaskExecutorGateway
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
- tddFuture.complete(taskDeploymentDescriptor);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- })
+
tddFuture.complete(taskDeploymentDescriptor);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
.createTestingTaskExecutorGateway();
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(),
testingTaskExecutorGateway);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add testRequestNextInputSplit
> -----------------------------
>
> Key: FLINK-10056
> URL: https://issues.apache.org/jira/browse/FLINK-10056
> Project: Flink
> Issue Type: Improvement
> Components: JobManager, Tests
> Affects Versions: 1.5.0
> Reporter: 陈梓立
> Assignee: 陈梓立
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add testRequestNextInputSplit to make sure JobMaster#requestNextInputSplit
> works as expected.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)