This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acf228cc5c88872428ccb10296aa1646837c16c2
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jan 31 11:39:55 2019 +0100

    [FLINK-11389][tests] Refactor TaskDeploymentDescriptorTest
    
    This closes #7532.
---
 .../deployment/TaskDeploymentDescriptorTest.java   | 178 +++++++++++----------
 1 file changed, 93 insertions(+), 85 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 22e943b..a617ce1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -33,107 +33,115 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link TaskDeploymentDescriptor}.
  */
-public class TaskDeploymentDescriptorTest {
+public class TaskDeploymentDescriptorTest extends TestLogger {
+
+       private static final JobID jobID = new JobID();
+       private static final JobVertexID vertexID = new JobVertexID();
+       private static final ExecutionAttemptID execId = new 
ExecutionAttemptID();
+       private static final AllocationID allocationId = new AllocationID();
+       private static final String jobName = "job name";
+       private static final String taskName = "task name";
+       private static final int numberOfKeyGroups = 1;
+       private static final int indexInSubtaskGroup = 0;
+       private static final int currentNumberOfSubtasks = 1;
+       private static final int attemptNumber = 0;
+       private static final Configuration jobConfiguration = new 
Configuration();
+       private static final Configuration taskConfiguration = new 
Configuration();
+       private static final Class<? extends AbstractInvokable> invokableClass 
= BatchTask.class;
+       private static final List<ResultPartitionDeploymentDescriptor> 
producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
+       private static final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
+       private static final List<PermanentBlobKey> requiredJars = new 
ArrayList<>(0);
+       private static final List<URL> requiredClasspaths = new ArrayList<>(0);
+       private static final int targetSlotNumber = 47;
+       private static final TaskStateSnapshot taskStateHandles = new 
TaskStateSnapshot();
+       private static final JobManagerTaskRestore taskRestore = new 
JobManagerTaskRestore(1L, taskStateHandles);
+
+       private final SerializedValue<ExecutionConfig> executionConfig = new 
SerializedValue<>(new ExecutionConfig());
+       private final SerializedValue<JobInformation> serializedJobInformation 
= new SerializedValue<>(new JobInformation(
+               jobID, jobName, executionConfig, jobConfiguration, 
requiredJars, requiredClasspaths));
+       private final SerializedValue<TaskInformation> 
serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
+               vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, 
invokableClass.getName(), taskConfiguration));
+
+       public TaskDeploymentDescriptorTest() throws IOException {}
+
        @Test
-       public void testSerialization() {
-               try {
-                       final JobID jobID = new JobID();
-                       final JobVertexID vertexID = new JobVertexID();
-                       final ExecutionAttemptID execId = new 
ExecutionAttemptID();
-                       final AllocationID allocationId = new AllocationID();
-                       final String jobName = "job name";
-                       final String taskName = "task name";
-                       final int numberOfKeyGroups = 1;
-                       final int indexInSubtaskGroup = 0;
-                       final int currentNumberOfSubtasks = 1;
-                       final int attemptNumber = 0;
-                       final Configuration jobConfiguration = new 
Configuration();
-                       final Configuration taskConfiguration = new 
Configuration();
-                       final Class<? extends AbstractInvokable> invokableClass 
= BatchTask.class;
-                       final List<ResultPartitionDeploymentDescriptor> 
producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
-                       final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
-                       final List<PermanentBlobKey> requiredJars = new 
ArrayList<>(0);
-                       final List<URL> requiredClasspaths = new ArrayList<>(0);
-                       final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
-                       final SerializedValue<JobInformation> 
serializedJobInformation = new SerializedValue<>(new JobInformation(
-                               jobID, jobName, executionConfig, 
jobConfiguration, requiredJars, requiredClasspaths));
-                       final SerializedValue<TaskInformation> 
serializedJobVertexInformation = new SerializedValue<>(new TaskInformation(
-                               vertexID, taskName, currentNumberOfSubtasks, 
numberOfKeyGroups, invokableClass.getName(), taskConfiguration));
-                       final int targetSlotNumber = 47;
-                       final TaskStateSnapshot taskStateHandles = new 
TaskStateSnapshot();
-                       final JobManagerTaskRestore taskRestore = new 
JobManagerTaskRestore(1L, taskStateHandles);
-
-                       final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(
-                               jobID,
-                               new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-                               new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation),
-                               execId,
-                               allocationId,
-                               indexInSubtaskGroup,
-                               attemptNumber,
-                               targetSlotNumber,
-                               taskRestore,
-                               producedResults,
-                               inputGates);
-
-                       final TaskDeploymentDescriptor copy = 
CommonTestUtils.createCopySerializable(orig);
-
-                       assertFalse(orig.getSerializedJobInformation() == 
copy.getSerializedJobInformation());
-                       assertFalse(orig.getSerializedTaskInformation() == 
copy.getSerializedTaskInformation());
-                       assertFalse(orig.getExecutionAttemptId() == 
copy.getExecutionAttemptId());
-                       assertFalse(orig.getTaskRestore() == 
copy.getTaskRestore());
-                       assertFalse(orig.getProducedPartitions() == 
copy.getProducedPartitions());
-                       assertFalse(orig.getInputGates() == 
copy.getInputGates());
-
-                       assertEquals(orig.getSerializedJobInformation(), 
copy.getSerializedJobInformation());
-                       assertEquals(orig.getSerializedTaskInformation(), 
copy.getSerializedTaskInformation());
-                       assertEquals(orig.getExecutionAttemptId(), 
copy.getExecutionAttemptId());
-                       assertEquals(orig.getAllocationId(), 
copy.getAllocationId());
-                       assertEquals(orig.getSubtaskIndex(), 
copy.getSubtaskIndex());
-                       assertEquals(orig.getAttemptNumber(), 
copy.getAttemptNumber());
-                       assertEquals(orig.getTargetSlotNumber(), 
copy.getTargetSlotNumber());
-                       
assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), 
copy.getTaskRestore().getRestoreCheckpointId());
-                       
assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), 
copy.getTaskRestore().getTaskStateSnapshot());
-                       assertEquals(orig.getProducedPartitions(), 
copy.getProducedPartitions());
-                       assertEquals(orig.getInputGates(), 
copy.getInputGates());
-
-                       final TaskDeploymentDescriptor 
testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
-                               jobID,
-                               new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
-                               new TaskDeploymentDescriptor.Offloaded<>(new 
PermanentBlobKey()),
-                               execId,
-                               allocationId,
-                               indexInSubtaskGroup,
-                               attemptNumber,
-                               targetSlotNumber,
-                               taskRestore,
-                               producedResults,
-                               inputGates);
-                       try {
-                               
testOffLoadedTaskInformation.getSerializedTaskInformation();
-                       } catch (Exception e) {
-                               assertTrue(e instanceof IllegalStateException);
-                       }
+       public void testSerialization() throws Exception {
+               final TaskDeploymentDescriptor orig = 
createTaskDeploymentDescriptor(
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobVertexInformation));
 
+               final TaskDeploymentDescriptor copy = 
CommonTestUtils.createCopySerializable(orig);
+
+               assertFalse(orig.getSerializedJobInformation() == 
copy.getSerializedJobInformation());
+               assertFalse(orig.getSerializedTaskInformation() == 
copy.getSerializedTaskInformation());
+               assertFalse(orig.getExecutionAttemptId() == 
copy.getExecutionAttemptId());
+               assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
+               assertFalse(orig.getProducedPartitions() == 
copy.getProducedPartitions());
+               assertFalse(orig.getInputGates() == copy.getInputGates());
+
+               assertEquals(orig.getSerializedJobInformation(), 
copy.getSerializedJobInformation());
+               assertEquals(orig.getSerializedTaskInformation(), 
copy.getSerializedTaskInformation());
+               assertEquals(orig.getExecutionAttemptId(), 
copy.getExecutionAttemptId());
+               assertEquals(orig.getAllocationId(), copy.getAllocationId());
+               assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
+               assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
+               assertEquals(orig.getTargetSlotNumber(), 
copy.getTargetSlotNumber());
+               assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), 
copy.getTaskRestore().getRestoreCheckpointId());
+               assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), 
copy.getTaskRestore().getTaskStateSnapshot());
+               assertEquals(orig.getProducedPartitions(), 
copy.getProducedPartitions());
+               assertEquals(orig.getInputGates(), copy.getInputGates());
+       }
+
+       @Test
+       public void testOffLoadedAndNonOffLoadedPayload() {
+               final TaskDeploymentDescriptor taskDeploymentDescriptor = 
createTaskDeploymentDescriptor(
+                       new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                       new TaskDeploymentDescriptor.Offloaded<>(new 
PermanentBlobKey()));
+
+               SerializedValue<JobInformation> actualSerializedJobInformation 
= taskDeploymentDescriptor.getSerializedJobInformation();
+               assertThat(actualSerializedJobInformation, 
is(serializedJobInformation));
+
+               try {
+                       taskDeploymentDescriptor.getSerializedTaskInformation();
+                       fail("Expected to fail since the task information 
should be offloaded.");
+               } catch (IllegalStateException expected) {
+                       // expected
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+       }
+
+       @Nonnull
+       private TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(TaskDeploymentDescriptor.MaybeOffloaded<JobInformation>
 jobInformation, TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> 
taskInformation) {
+               return new TaskDeploymentDescriptor(
+                       jobID,
+                       jobInformation,
+                       taskInformation,
+                       execId,
+                       allocationId,
+                       indexInSubtaskGroup,
+                       attemptNumber,
+                       targetSlotNumber,
+                       taskRestore,
+                       producedResults,
+                       inputGates);
        }
 }

Reply via email to