This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10b2185e8094df3e951c95688f07641af1dab737 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Apr 24 17:35:01 2019 +0200 [FLINK-12323] Remove legacy ActorGateway based interface implementations Remove ActorGatewayCheckpointResponder Remove ActorGatewayGlobalAggregateManager Remove ActorGatewayKvStateLocationOracle Remove ActorGatewayKvStateRegistryListener Remove ActorGatewayPartitionProducerStateChecker Remove ActorGatewayResultPartitionConsumableNotifier Remove ActorGatewayTaskManagerActions Remove TaskInputSplitProvider This closes #8256. --- .../ActorGatewayCheckpointResponder.java | 71 -------------- .../ActorGatewayGlobalAggregateManager.java | 32 ------ .../ActorGatewayKvStateLocationOracle.java | 63 ------------ .../ActorGatewayKvStateRegistryListener.java | 84 ---------------- .../ActorGatewayPartitionProducerStateChecker.java | 68 ------------- ...orGatewayResultPartitionConsumableNotifier.java | 78 --------------- .../ActorGatewayTaskManagerActions.java | 59 ----------- .../taskmanager/TaskInputSplitProvider.java | 108 --------------------- .../taskmanager/TaskInputSplitProviderTest.java | 80 --------------- 9 files changed, 643 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java deleted file mode 100644 index e9f600d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; -import org.apache.flink.util.Preconditions; - -/** - * Implementation using {@link ActorGateway} to forward the messages. - */ -public class ActorGatewayCheckpointResponder implements CheckpointResponder { - - private final ActorGateway actorGateway; - - public ActorGatewayCheckpointResponder(ActorGateway actorGateway) { - this.actorGateway = Preconditions.checkNotNull(actorGateway); - } - - @Override - public void acknowledgeCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - CheckpointMetrics checkpointMetrics, - TaskStateSnapshot checkpointStateHandles) { - - AcknowledgeCheckpoint message = new AcknowledgeCheckpoint( - jobID, executionAttemptID, checkpointId, checkpointMetrics, - checkpointStateHandles); - - actorGateway.tell(message); - } - - @Override - public void declineCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - Throwable reason) { - - DeclineCheckpoint decline = new DeclineCheckpoint( - jobID, - executionAttemptID, - checkpointId, - reason); - - actorGateway.tell(decline); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java deleted file mode 100644 index 8a793b4..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import java.io.IOException; -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; - -public class ActorGatewayGlobalAggregateManager implements GlobalAggregateManager{ - - @Override - public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, Object aggregand, - AggregateFunction<IN, ACC, OUT> aggregateFunction) throws IOException { - throw new UnsupportedOperationException("The updateGlobalAggregate() functionality is only supported for TaskExecutor/JobMaster based deployments"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java deleted file mode 100644 index d874438..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobmaster.KvStateLocationOracle; -import org.apache.flink.runtime.query.KvStateLocation; -import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.util.Preconditions; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * {@link KvStateLocationOracle} implementation for {@link ActorGateway}. - */ -public class ActorGatewayKvStateLocationOracle implements KvStateLocationOracle { - - private final ActorGateway jobManagerActorGateway; - - private final FiniteDuration timeout; - - public ActorGatewayKvStateLocationOracle( - ActorGateway jobManagerActorGateway, - Time timeout) { - this.jobManagerActorGateway = Preconditions.checkNotNull(jobManagerActorGateway); - - Preconditions.checkNotNull(timeout); - this.timeout = FiniteDuration.apply(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); - } - - @Override - public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID jobId, String registrationName) { - final KvStateMessage.LookupKvStateLocation lookupKvStateLocation = new KvStateMessage.LookupKvStateLocation(jobId, registrationName); - - return FutureUtils.toJava( - jobManagerActorGateway - .ask(lookupKvStateLocation, timeout) - .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java deleted file mode 100644 index 63bda99..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.queryablestate.KvStateID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.query.KvStateMessage; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.util.Preconditions; - -import java.net.InetSocketAddress; - -/** - * This implementation uses {@link ActorGateway} to forward key-value state notifications to the job - * manager. The notifications are wrapped in an actor message and send to the given actor gateway. - */ -public class ActorGatewayKvStateRegistryListener implements KvStateRegistryListener { - - private ActorGateway jobManager; - - private InetSocketAddress kvStateServerAddress; - - public ActorGatewayKvStateRegistryListener( - ActorGateway jobManager, - InetSocketAddress kvStateServerAddress) { - - this.jobManager = Preconditions.checkNotNull(jobManager, "JobManager"); - this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress"); - } - - @Override - public void notifyKvStateRegistered( - JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName, - KvStateID kvStateId) { - - Object msg = new KvStateMessage.NotifyKvStateRegistered( - jobId, - jobVertexId, - keyGroupRange, - registrationName, - kvStateId, - kvStateServerAddress); - - jobManager.tell(msg); - } - - @Override - public void notifyKvStateUnregistered( - JobID jobId, - JobVertexID jobVertexId, - KeyGroupRange keyGroupRange, - String registrationName) { - - Object msg = new KvStateMessage.NotifyKvStateUnregistered( - jobId, - jobVertexId, - keyGroupRange, - registrationName); - - jobManager.tell(msg); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java deleted file mode 100644 index cb031d2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.util.Preconditions; - -import java.util.concurrent.CompletableFuture; - -import scala.concurrent.duration.FiniteDuration; -import scala.reflect.ClassTag$; - -/** - * This implementation uses {@link ActorGateway} to trigger the partition state check at the job - * manager. - */ -public class ActorGatewayPartitionProducerStateChecker implements PartitionProducerStateChecker { - - private final ActorGateway jobManager; - private final FiniteDuration timeout; - - public ActorGatewayPartitionProducerStateChecker(ActorGateway jobManager, FiniteDuration timeout) { - this.jobManager = Preconditions.checkNotNull(jobManager); - this.timeout = Preconditions.checkNotNull(timeout); - } - - @Override - public CompletableFuture<ExecutionState> requestPartitionProducerState( - JobID jobId, - IntermediateDataSetID intermediateDataSetId, - ResultPartitionID resultPartitionId) { - - JobManagerMessages.RequestPartitionProducerState msg = new JobManagerMessages.RequestPartitionProducerState( - jobId, - intermediateDataSetId, resultPartitionId - ); - - scala.concurrent.Future<ExecutionState> futureResponse = jobManager - .ask(msg, timeout) - .mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class)); - - return FutureUtils.toJava(futureResponse); - } - -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java deleted file mode 100644 index ee7d3fb..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import akka.dispatch.OnFailure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -/** - * This implementation uses {@link ActorGateway} to notify the job manager about consumable - * partitions. - */ -public class ActorGatewayResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { - - private static final Logger LOG = LoggerFactory.getLogger(ActorGatewayResultPartitionConsumableNotifier.class); - - /** - * {@link ExecutionContext} which is used for the failure handler of - * {@link JobManagerMessages.ScheduleOrUpdateConsumers} messages. - */ - private final ExecutionContext executionContext; - - private final ActorGateway jobManager; - - private final FiniteDuration jobManagerMessageTimeout; - - public ActorGatewayResultPartitionConsumableNotifier( - ExecutionContext executionContext, - ActorGateway jobManager, - FiniteDuration jobManagerMessageTimeout) { - - this.executionContext = Preconditions.checkNotNull(executionContext); - this.jobManager = Preconditions.checkNotNull(jobManager); - this.jobManagerMessageTimeout = Preconditions.checkNotNull(jobManagerMessageTimeout); - } - - @Override - public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId, final TaskActions taskActions) { - - final JobManagerMessages.ScheduleOrUpdateConsumers msg = new JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId); - - Future<Object> futureResponse = jobManager.ask(msg, jobManagerMessageTimeout); - - futureResponse.onFailure(new OnFailure() { - @Override - public void onFailure(Throwable failure) { - LOG.error("Could not schedule or update consumers at the JobManager.", failure); - - taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers", failure)); - } - }, executionContext); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java deleted file mode 100644 index 1731fcc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.messages.TaskMessages; -import org.apache.flink.util.Preconditions; - -/** - * Implementation using {@link ActorGateway} to forward the messages. - */ -public class ActorGatewayTaskManagerActions implements TaskManagerActions { - - private final ActorGateway actorGateway; - - public ActorGatewayTaskManagerActions(ActorGateway actorGateway) { - this.actorGateway = Preconditions.checkNotNull(actorGateway); - } - - @Override - public void notifyFatalError(String message, Throwable cause) { - actorGateway.tell(new TaskManagerMessages.FatalError(message, cause)); - } - - @Override - public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) { - actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause)); - } - - @Override - public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { - final TaskMessages.TaskMessage taskMessage; - if (taskExecutionState.getExecutionState().isTerminal()) { - taskMessage = new TaskMessages.TaskInFinalState(taskExecutionState.getID()); - } else { - taskMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState); - } - - actorGateway.tell(taskMessage); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java deleted file mode 100644 index 31c518a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.util.InstantiationUtil; - -import org.apache.flink.util.Preconditions; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -/** - * Implementation using {@link ActorGateway} to forward the messages. - */ -public class TaskInputSplitProvider implements InputSplitProvider { - - private final ActorGateway jobManager; - - private final JobID jobID; - - private final JobVertexID vertexID; - - private final ExecutionAttemptID executionID; - - private final FiniteDuration timeout; - - - public TaskInputSplitProvider( - ActorGateway jobManager, - JobID jobID, - JobVertexID vertexID, - ExecutionAttemptID executionID, - FiniteDuration timeout) { - - this.jobManager = Preconditions.checkNotNull(jobManager); - this.jobID = Preconditions.checkNotNull(jobID); - this.vertexID = Preconditions.checkNotNull(vertexID); - this.executionID = Preconditions.checkNotNull(executionID); - this.timeout = Preconditions.checkNotNull(timeout); - } - - @Override - public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { - Preconditions.checkNotNull(userCodeClassLoader); - - final Future<Object> response = jobManager.ask( - new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID), - timeout); - - final Object result; - - try { - result = Await.result(response, timeout); - } catch (Exception e) { - throw new InputSplitProviderException("Did not receive next input split from JobManager.", e); - } - - if(result instanceof JobManagerMessages.NextInputSplit){ - final JobManagerMessages.NextInputSplit nextInputSplit = - (JobManagerMessages.NextInputSplit) result; - - byte[] serializedData = nextInputSplit.splitData(); - - if(serializedData == null) { - return null; - } else { - final Object deserialized; - - try { - deserialized = InstantiationUtil.deserializeObject(serializedData, - userCodeClassLoader); - } catch (Exception e) { - throw new InputSplitProviderException("Could not deserialize the serialized input split.", e); - } - - return (InputSplit) deserialized; - } - } else { - throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " + - "NextInputSplit. Instead response is of type " + result.getClass() + '.'); - } - - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java deleted file mode 100644 index 777633d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.BaseTestingActorGateway; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.junit.Test; -import scala.concurrent.duration.FiniteDuration; - -import static org.junit.Assert.*; - -import java.util.concurrent.TimeUnit; - -public class TaskInputSplitProviderTest { - - @Test - public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException { - - final JobID jobID = new JobID(); - final JobVertexID vertexID = new JobVertexID(); - final ExecutionAttemptID executionID = new ExecutionAttemptID(); - final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - - final ActorGateway gateway = new NullInputSplitGateway(); - - - final TaskInputSplitProvider provider = new TaskInputSplitProvider( - gateway, - jobID, - vertexID, - executionID, - timeout); - - // The jobManager will return a - InputSplit nextInputSplit = provider.getNextInputSplit(getClass().getClassLoader()); - - assertTrue(nextInputSplit == null); - } - - public static class NullInputSplitGateway extends BaseTestingActorGateway { - - private static final long serialVersionUID = -7733997150554492926L; - - public NullInputSplitGateway() { - super(TestingUtils.defaultExecutionContext()); - } - - @Override - public Object handleMessage(Object message) throws Exception { - if(message instanceof JobManagerMessages.RequestNextInputSplit) { - return new JobManagerMessages.NextInputSplit(null); - } else { - throw new Exception("Invalid message type"); - } - } - } -}
