This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c39192a21098882338f0dcd9636f9241814099ce Author: Till Rohrmann <[email protected]> AuthorDate: Thu Jan 31 11:39:55 2019 +0100 [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest This closes #7532. --- .../deployment/TaskDeploymentDescriptorTest.java | 178 +++++++++++---------- 1 file changed, 93 insertions(+), 85 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 22e943b..a617ce1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -33,107 +33,115 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; import org.junit.Test; +import javax.annotation.Nonnull; + +import java.io.IOException; import java.net.URL; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** * Tests for the {@link TaskDeploymentDescriptor}. */ -public class TaskDeploymentDescriptorTest { +public class TaskDeploymentDescriptorTest extends TestLogger { + + private static final JobID jobID = new JobID(); + private static final JobVertexID vertexID = new JobVertexID(); + private static final ExecutionAttemptID execId = new ExecutionAttemptID(); + private static final AllocationID allocationId = new AllocationID(); + private static final String jobName = "job name"; + private static final String taskName = "task name"; + private static final int numberOfKeyGroups = 1; + private static final int indexInSubtaskGroup = 0; + private static final int currentNumberOfSubtasks = 1; + private static final int attemptNumber = 0; + private static final Configuration jobConfiguration = new Configuration(); + private static final Configuration taskConfiguration = new Configuration(); + private static final Class<? extends AbstractInvokable> invokableClass = BatchTask.class; + private static final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0); + private static final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); + private static final List<PermanentBlobKey> requiredJars = new ArrayList<>(0); + private static final List<URL> requiredClasspaths = new ArrayList<>(0); + private static final int targetSlotNumber = 47; + private static final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot(); + private static final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles); + + private final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); + private final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation( + jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths)); + private final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation( + vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration)); + + public TaskDeploymentDescriptorTest() throws IOException {} + @Test - public void testSerialization() { - try { - final JobID jobID = new JobID(); - final JobVertexID vertexID = new JobVertexID(); - final ExecutionAttemptID execId = new ExecutionAttemptID(); - final AllocationID allocationId = new AllocationID(); - final String jobName = "job name"; - final String taskName = "task name"; - final int numberOfKeyGroups = 1; - final int indexInSubtaskGroup = 0; - final int currentNumberOfSubtasks = 1; - final int attemptNumber = 0; - final Configuration jobConfiguration = new Configuration(); - final Configuration taskConfiguration = new Configuration(); - final Class<? extends AbstractInvokable> invokableClass = BatchTask.class; - final List<ResultPartitionDeploymentDescriptor> producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0); - final List<InputGateDeploymentDescriptor> inputGates = new ArrayList<InputGateDeploymentDescriptor>(0); - final List<PermanentBlobKey> requiredJars = new ArrayList<>(0); - final List<URL> requiredClasspaths = new ArrayList<>(0); - final SerializedValue<ExecutionConfig> executionConfig = new SerializedValue<>(new ExecutionConfig()); - final SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(new JobInformation( - jobID, jobName, executionConfig, jobConfiguration, requiredJars, requiredClasspaths)); - final SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(new TaskInformation( - vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(), taskConfiguration)); - final int targetSlotNumber = 47; - final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot(); - final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles); - - final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor( - jobID, - new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), - new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation), - execId, - allocationId, - indexInSubtaskGroup, - attemptNumber, - targetSlotNumber, - taskRestore, - producedResults, - inputGates); - - final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); - - assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation()); - assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation()); - assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId()); - assertFalse(orig.getTaskRestore() == copy.getTaskRestore()); - assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions()); - assertFalse(orig.getInputGates() == copy.getInputGates()); - - assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation()); - assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation()); - assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId()); - assertEquals(orig.getAllocationId(), copy.getAllocationId()); - assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex()); - assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); - assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber()); - assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId()); - assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot()); - assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); - assertEquals(orig.getInputGates(), copy.getInputGates()); - - final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor( - jobID, - new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), - new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()), - execId, - allocationId, - indexInSubtaskGroup, - attemptNumber, - targetSlotNumber, - taskRestore, - producedResults, - inputGates); - try { - testOffLoadedTaskInformation.getSerializedTaskInformation(); - } catch (Exception e) { - assertTrue(e instanceof IllegalStateException); - } + public void testSerialization() throws Exception { + final TaskDeploymentDescriptor orig = createTaskDeploymentDescriptor( + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation)); + final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); + + assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation()); + assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation()); + assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId()); + assertFalse(orig.getTaskRestore() == copy.getTaskRestore()); + assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions()); + assertFalse(orig.getInputGates() == copy.getInputGates()); + + assertEquals(orig.getSerializedJobInformation(), copy.getSerializedJobInformation()); + assertEquals(orig.getSerializedTaskInformation(), copy.getSerializedTaskInformation()); + assertEquals(orig.getExecutionAttemptId(), copy.getExecutionAttemptId()); + assertEquals(orig.getAllocationId(), copy.getAllocationId()); + assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex()); + assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); + assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber()); + assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId()); + assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot()); + assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); + assertEquals(orig.getInputGates(), copy.getInputGates()); + } + + @Test + public void testOffLoadedAndNonOffLoadedPayload() { + final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor( + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), + new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey())); + + SerializedValue<JobInformation> actualSerializedJobInformation = taskDeploymentDescriptor.getSerializedJobInformation(); + assertThat(actualSerializedJobInformation, is(serializedJobInformation)); + + try { + taskDeploymentDescriptor.getSerializedTaskInformation(); + fail("Expected to fail since the task information should be offloaded."); + } catch (IllegalStateException expected) { + // expected } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + + @Nonnull + private TaskDeploymentDescriptor createTaskDeploymentDescriptor(TaskDeploymentDescriptor.MaybeOffloaded<JobInformation> jobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> taskInformation) { + return new TaskDeploymentDescriptor( + jobID, + jobInformation, + taskInformation, + execId, + allocationId, + indexInSubtaskGroup, + attemptNumber, + targetSlotNumber, + taskRestore, + producedResults, + inputGates); } }
