yuqi created FLINK-11389:
----------------------------

             Summary: Incorrectly use job information when call 
getSerializedTaskInformation in class TaskDeploymentDescriptor
                 Key: FLINK-11389
                 URL: https://issues.apache.org/jira/browse/FLINK-11389
             Project: Flink
          Issue Type: Bug
            Reporter: yuqi
             Fix For: 1.7.2


See TaskDeploymentDescriptor


{code:java}
@Nullable
        public SerializedValue<TaskInformation> getSerializedTaskInformation() {
                if (serializedJobInformation instanceof NonOffloaded) {
                        NonOffloaded<TaskInformation> jobInformation =
                                (NonOffloaded<TaskInformation>) 
serializedTaskInformation;
                        return jobInformation.serializedValue;
                } else {
                        throw new IllegalStateException(
                                "Trying to work with offloaded serialized job 
information.");
                }
        }
{code}

the condition serializedJobInformation instanceof NonOffloaded is not 
correctly, 
as serializedJobInformation and serializedTaskInformation are passed from 
ExecutionVertex#createDeploymentDescriptor


{code:java}

                if (jobInformationOrBlobKey.isLeft()) {
                        serializedJobInformation = new 
TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left());
                } else {
                        serializedJobInformation = new 
TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right());
                }

                final Either<SerializedValue<TaskInformation>, 
PermanentBlobKey> taskInformationOrBlobKey;

                try {
                        taskInformationOrBlobKey = 
jobVertex.getTaskInformationOrBlobKey();
                } catch (IOException e) {
                        throw new ExecutionGraphException(
                                "Could not create a serialized 
JobVertexInformation for " +
                                        jobVertex.getJobVertexId(), e);
                }

                final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> 
serializedTaskInformation;

                if (taskInformationOrBlobKey.isLeft()) {
                        serializedTaskInformation = new 
TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left());
                } else {
                        serializedTaskInformation = new 
TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right());
                }
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to