This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 446145f5ca0cd92c64a6944c19ca1ab53104030f
Author: park.yq <[email protected]>
AuthorDate: Fri Jan 18 16:40:30 2019 +0800

    [FLINK-11389] Fix Incorrectly use job information when call 
getSerializedTaskInformation in class TaskDeploymentDescriptor
---
 .../runtime/deployment/TaskDeploymentDescriptor.java |  6 +++---
 .../deployment/TaskDeploymentDescriptorTest.java     | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 4f5b231..bb038eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -208,10 +208,10 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
         */
        @Nullable
        public SerializedValue<TaskInformation> getSerializedTaskInformation() {
-               if (serializedJobInformation instanceof NonOffloaded) {
-                       NonOffloaded<TaskInformation> jobInformation =
+               if (serializedTaskInformation instanceof NonOffloaded) {
+                       NonOffloaded<TaskInformation> taskInformation =
                                (NonOffloaded<TaskInformation>) 
serializedTaskInformation;
-                       return jobInformation.serializedValue;
+                       return taskInformation.serializedValue;
                } else {
                        throw new IllegalStateException(
                                "Trying to work with offloaded serialized job 
information.");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index e20d34b..22e943b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -42,6 +42,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -110,6 +111,25 @@ public class TaskDeploymentDescriptorTest {
                        
assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), 
copy.getTaskRestore().getTaskStateSnapshot());
                        assertEquals(orig.getProducedPartitions(), 
copy.getProducedPartitions());
                        assertEquals(orig.getInputGates(), 
copy.getInputGates());
+
+                       final TaskDeploymentDescriptor 
testOffLoadedTaskInformation = new TaskDeploymentDescriptor(
+                               jobID,
+                               new 
TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+                               new TaskDeploymentDescriptor.Offloaded<>(new 
PermanentBlobKey()),
+                               execId,
+                               allocationId,
+                               indexInSubtaskGroup,
+                               attemptNumber,
+                               targetSlotNumber,
+                               taskRestore,
+                               producedResults,
+                               inputGates);
+                       try {
+                               
testOffLoadedTaskInformation.getSerializedTaskInformation();
+                       } catch (Exception e) {
+                               assertTrue(e instanceof IllegalStateException);
+                       }
+
                }
                catch (Exception e) {
                        e.printStackTrace();

Reply via email to