This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 446145f5ca0cd92c64a6944c19ca1ab53104030f Author: park.yq <[email protected]> AuthorDate: Fri Jan 18 16:40:30 2019 +0800 [FLINK-11389] Fix Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor --- .../runtime/deployment/TaskDeploymentDescriptor.java | 6 +++--- .../deployment/TaskDeploymentDescriptorTest.java | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 4f5b231..bb038eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -208,10 +208,10 @@ public final class TaskDeploymentDescriptor implements Serializable { */ @Nullable public SerializedValue<TaskInformation> getSerializedTaskInformation() { - if (serializedJobInformation instanceof NonOffloaded) { - NonOffloaded<TaskInformation> jobInformation = + if (serializedTaskInformation instanceof NonOffloaded) { + NonOffloaded<TaskInformation> taskInformation = (NonOffloaded<TaskInformation>) serializedTaskInformation; - return jobInformation.serializedValue; + return taskInformation.serializedValue; } else { throw new IllegalStateException( "Trying to work with offloaded serialized job information."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index e20d34b..22e943b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -42,6 +42,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -110,6 +111,25 @@ public class TaskDeploymentDescriptorTest { assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot()); assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); assertEquals(orig.getInputGates(), copy.getInputGates()); + + final TaskDeploymentDescriptor testOffLoadedTaskInformation = new TaskDeploymentDescriptor( + jobID, + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), + new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()), + execId, + allocationId, + indexInSubtaskGroup, + attemptNumber, + targetSlotNumber, + taskRestore, + producedResults, + inputGates); + try { + testOffLoadedTaskInformation.getSerializedTaskInformation(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + } + } catch (Exception e) { e.printStackTrace();
