yuqi created FLINK-11389: ---------------------------- Summary: Incorrectly use job information when call getSerializedTaskInformation in class TaskDeploymentDescriptor Key: FLINK-11389 URL: https://issues.apache.org/jira/browse/FLINK-11389 Project: Flink Issue Type: Bug Reporter: yuqi Fix For: 1.7.2
See TaskDeploymentDescriptor {code:java} @Nullable public SerializedValue<TaskInformation> getSerializedTaskInformation() { if (serializedJobInformation instanceof NonOffloaded) { NonOffloaded<TaskInformation> jobInformation = (NonOffloaded<TaskInformation>) serializedTaskInformation; return jobInformation.serializedValue; } else { throw new IllegalStateException( "Trying to work with offloaded serialized job information."); } } {code} the condition serializedJobInformation instanceof NonOffloaded is not correctly, as serializedJobInformation and serializedTaskInformation are passed from ExecutionVertex#createDeploymentDescriptor {code:java} if (jobInformationOrBlobKey.isLeft()) { serializedJobInformation = new TaskDeploymentDescriptor.NonOffloaded<>(jobInformationOrBlobKey.left()); } else { serializedJobInformation = new TaskDeploymentDescriptor.Offloaded<>(jobInformationOrBlobKey.right()); } final Either<SerializedValue<TaskInformation>, PermanentBlobKey> taskInformationOrBlobKey; try { taskInformationOrBlobKey = jobVertex.getTaskInformationOrBlobKey(); } catch (IOException e) { throw new ExecutionGraphException( "Could not create a serialized JobVertexInformation for " + jobVertex.getJobVertexId(), e); } final TaskDeploymentDescriptor.MaybeOffloaded<TaskInformation> serializedTaskInformation; if (taskInformationOrBlobKey.isLeft()) { serializedTaskInformation = new TaskDeploymentDescriptor.NonOffloaded<>(taskInformationOrBlobKey.left()); } else { serializedTaskInformation = new TaskDeploymentDescriptor.Offloaded<>(taskInformationOrBlobKey.right()); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)