This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10b2185e8094df3e951c95688f07641af1dab737
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Apr 24 17:35:01 2019 +0200

    [FLINK-12323] Remove legacy ActorGateway based interface implementations
    
    Remove ActorGatewayCheckpointResponder
    
    Remove ActorGatewayGlobalAggregateManager
    
    Remove ActorGatewayKvStateLocationOracle
    
    Remove ActorGatewayKvStateRegistryListener
    
    Remove ActorGatewayPartitionProducerStateChecker
    
    Remove ActorGatewayResultPartitionConsumableNotifier
    
    Remove ActorGatewayTaskManagerActions
    
    Remove TaskInputSplitProvider
    
    This closes #8256.
---
 .../ActorGatewayCheckpointResponder.java           |  71 --------------
 .../ActorGatewayGlobalAggregateManager.java        |  32 ------
 .../ActorGatewayKvStateLocationOracle.java         |  63 ------------
 .../ActorGatewayKvStateRegistryListener.java       |  84 ----------------
 .../ActorGatewayPartitionProducerStateChecker.java |  68 -------------
 ...orGatewayResultPartitionConsumableNotifier.java |  78 ---------------
 .../ActorGatewayTaskManagerActions.java            |  59 -----------
 .../taskmanager/TaskInputSplitProvider.java        | 108 ---------------------
 .../taskmanager/TaskInputSplitProviderTest.java    |  80 ---------------
 9 files changed, 643 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
deleted file mode 100644
index e9f600d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayCheckpointResponder implements CheckpointResponder {
-
-       private final ActorGateway actorGateway;
-
-       public ActorGatewayCheckpointResponder(ActorGateway actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       @Override
-       public void acknowledgeCheckpoint(
-                       JobID jobID,
-                       ExecutionAttemptID executionAttemptID,
-                       long checkpointId,
-                       CheckpointMetrics checkpointMetrics,
-                       TaskStateSnapshot checkpointStateHandles) {
-
-               AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-                               jobID, executionAttemptID, checkpointId, 
checkpointMetrics,
-                               checkpointStateHandles);
-
-               actorGateway.tell(message);
-       }
-
-       @Override
-       public void declineCheckpoint(
-                       JobID jobID,
-                       ExecutionAttemptID executionAttemptID,
-                       long checkpointId,
-                       Throwable reason) {
-
-               DeclineCheckpoint decline = new DeclineCheckpoint(
-                       jobID,
-                       executionAttemptID,
-                       checkpointId,
-                       reason);
-
-               actorGateway.tell(decline);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java
deleted file mode 100644
index 8a793b4..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayGlobalAggregateManager.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import java.io.IOException;
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
-
-public class ActorGatewayGlobalAggregateManager implements 
GlobalAggregateManager{
-
-       @Override
-       public <IN, ACC, OUT> OUT updateGlobalAggregate(String aggregateName, 
Object aggregand,
-               AggregateFunction<IN, ACC, OUT> aggregateFunction) throws 
IOException {
-               throw new UnsupportedOperationException("The 
updateGlobalAggregate() functionality is only supported for 
TaskExecutor/JobMaster based deployments");
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
deleted file mode 100644
index d874438..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateLocationOracle.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.util.Preconditions;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * {@link KvStateLocationOracle} implementation for {@link ActorGateway}.
- */
-public class ActorGatewayKvStateLocationOracle implements 
KvStateLocationOracle {
-
-       private final ActorGateway jobManagerActorGateway;
-
-       private final FiniteDuration timeout;
-
-       public ActorGatewayKvStateLocationOracle(
-                       ActorGateway jobManagerActorGateway,
-                       Time timeout) {
-               this.jobManagerActorGateway = 
Preconditions.checkNotNull(jobManagerActorGateway);
-
-               Preconditions.checkNotNull(timeout);
-               this.timeout = FiniteDuration.apply(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
-       }
-
-       @Override
-       public CompletableFuture<KvStateLocation> requestKvStateLocation(JobID 
jobId, String registrationName) {
-               final KvStateMessage.LookupKvStateLocation 
lookupKvStateLocation = new KvStateMessage.LookupKvStateLocation(jobId, 
registrationName);
-
-               return FutureUtils.toJava(
-                       jobManagerActorGateway
-                               .ask(lookupKvStateLocation, timeout)
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
deleted file mode 100644
index 63bda99..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.util.Preconditions;
-
-import java.net.InetSocketAddress;
-
-/**
- * This implementation uses {@link ActorGateway} to forward key-value state 
notifications to the job
- * manager. The notifications are wrapped in an actor message and send to the 
given actor gateway.
- */
-public class ActorGatewayKvStateRegistryListener implements 
KvStateRegistryListener {
-
-       private ActorGateway jobManager;
-
-       private InetSocketAddress kvStateServerAddress;
-
-       public ActorGatewayKvStateRegistryListener(
-               ActorGateway jobManager,
-               InetSocketAddress kvStateServerAddress) {
-
-               this.jobManager = Preconditions.checkNotNull(jobManager, 
"JobManager");
-               this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress");
-       }
-
-       @Override
-       public void notifyKvStateRegistered(
-               JobID jobId,
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName,
-               KvStateID kvStateId) {
-
-               Object msg = new KvStateMessage.NotifyKvStateRegistered(
-                       jobId,
-                       jobVertexId,
-                       keyGroupRange,
-                       registrationName,
-                       kvStateId,
-                       kvStateServerAddress);
-
-               jobManager.tell(msg);
-       }
-
-       @Override
-       public void notifyKvStateUnregistered(
-               JobID jobId,
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName) {
-
-               Object msg = new KvStateMessage.NotifyKvStateUnregistered(
-                       jobId,
-                       jobVertexId,
-                       keyGroupRange,
-                       registrationName);
-
-               jobManager.tell(msg);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
deleted file mode 100644
index cb031d2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.Preconditions;
-
-import java.util.concurrent.CompletableFuture;
-
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * This implementation uses {@link ActorGateway} to trigger the partition 
state check at the job
- * manager.
- */
-public class ActorGatewayPartitionProducerStateChecker implements 
PartitionProducerStateChecker {
-
-       private final ActorGateway jobManager;
-       private final FiniteDuration timeout;
-
-       public ActorGatewayPartitionProducerStateChecker(ActorGateway 
jobManager, FiniteDuration timeout) {
-               this.jobManager = Preconditions.checkNotNull(jobManager);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       @Override
-       public CompletableFuture<ExecutionState> requestPartitionProducerState(
-                       JobID jobId,
-                       IntermediateDataSetID intermediateDataSetId,
-                       ResultPartitionID resultPartitionId) {
-
-               JobManagerMessages.RequestPartitionProducerState msg = new 
JobManagerMessages.RequestPartitionProducerState(
-                       jobId,
-                       intermediateDataSetId, resultPartitionId
-               );
-
-               scala.concurrent.Future<ExecutionState> futureResponse = 
jobManager
-                       .ask(msg, timeout)
-                       
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
-
-               return FutureUtils.toJava(futureResponse);
-       }
-
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
deleted file mode 100644
index ee7d3fb..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import akka.dispatch.OnFailure;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * This implementation uses {@link ActorGateway} to notify the job manager 
about consumable
- * partitions.
- */
-public class ActorGatewayResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(ActorGatewayResultPartitionConsumableNotifier.class);
-
-       /**
-        * {@link ExecutionContext} which is used for the failure handler of
-        * {@link JobManagerMessages.ScheduleOrUpdateConsumers} messages.
-        */
-       private final ExecutionContext executionContext;
-
-       private final ActorGateway jobManager;
-
-       private final FiniteDuration jobManagerMessageTimeout;
-
-       public ActorGatewayResultPartitionConsumableNotifier(
-               ExecutionContext executionContext,
-               ActorGateway jobManager,
-               FiniteDuration jobManagerMessageTimeout) {
-
-               this.executionContext = 
Preconditions.checkNotNull(executionContext);
-               this.jobManager = Preconditions.checkNotNull(jobManager);
-               this.jobManagerMessageTimeout = 
Preconditions.checkNotNull(jobManagerMessageTimeout);
-       }
-
-       @Override
-       public void notifyPartitionConsumable(JobID jobId, final 
ResultPartitionID partitionId, final TaskActions taskActions) {
-
-               final JobManagerMessages.ScheduleOrUpdateConsumers msg = new 
JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
-
-               Future<Object> futureResponse = jobManager.ask(msg, 
jobManagerMessageTimeout);
-
-               futureResponse.onFailure(new OnFailure() {
-                       @Override
-                       public void onFailure(Throwable failure) {
-                               LOG.error("Could not schedule or update 
consumers at the JobManager.", failure);
-
-                               taskActions.failExternally(new 
RuntimeException("Could not notify JobManager to schedule or update consumers", 
failure));
-                       }
-               }, executionContext);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
deleted file mode 100644
index 1731fcc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.messages.TaskMessages;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class ActorGatewayTaskManagerActions implements TaskManagerActions {
-
-       private final ActorGateway actorGateway;
-
-       public ActorGatewayTaskManagerActions(ActorGateway actorGateway) {
-               this.actorGateway = Preconditions.checkNotNull(actorGateway);
-       }
-
-       @Override
-       public void notifyFatalError(String message, Throwable cause) {
-               actorGateway.tell(new TaskManagerMessages.FatalError(message, 
cause));
-       }
-
-       @Override
-       public void failTask(ExecutionAttemptID executionAttemptID, Throwable 
cause) {
-               actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, 
cause));
-       }
-
-       @Override
-       public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               final TaskMessages.TaskMessage taskMessage;
-               if (taskExecutionState.getExecutionState().isTerminal()) {
-                       taskMessage = new 
TaskMessages.TaskInFinalState(taskExecutionState.getID());
-               } else {
-                       taskMessage = new 
TaskMessages.UpdateTaskExecutionState(taskExecutionState);
-               }
-
-               actorGateway.tell(taskMessage);
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
deleted file mode 100644
index 31c518a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Implementation using {@link ActorGateway} to forward the messages.
- */
-public class TaskInputSplitProvider implements InputSplitProvider {
-
-       private final ActorGateway jobManager;
-       
-       private final JobID jobID;
-       
-       private final JobVertexID vertexID;
-
-       private final ExecutionAttemptID executionID;
-
-       private final FiniteDuration timeout;
-
-
-       public TaskInputSplitProvider(
-               ActorGateway jobManager,
-               JobID jobID,
-               JobVertexID vertexID,
-               ExecutionAttemptID executionID,
-               FiniteDuration timeout) {
-
-               this.jobManager = Preconditions.checkNotNull(jobManager);
-               this.jobID = Preconditions.checkNotNull(jobID);
-               this.vertexID = Preconditions.checkNotNull(vertexID);
-               this.executionID = Preconditions.checkNotNull(executionID);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       @Override
-       public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) 
throws InputSplitProviderException {
-               Preconditions.checkNotNull(userCodeClassLoader);
-
-               final Future<Object> response = jobManager.ask(
-                       new JobManagerMessages.RequestNextInputSplit(jobID, 
vertexID, executionID),
-                       timeout);
-
-               final Object result;
-
-               try {
-                       result = Await.result(response, timeout);
-               } catch (Exception e) {
-                       throw new InputSplitProviderException("Did not receive 
next input split from JobManager.", e);
-               }
-
-               if(result instanceof JobManagerMessages.NextInputSplit){
-                       final JobManagerMessages.NextInputSplit nextInputSplit =
-                               (JobManagerMessages.NextInputSplit) result;
-
-                       byte[] serializedData = nextInputSplit.splitData();
-
-                       if(serializedData == null) {
-                               return null;
-                       } else {
-                               final Object deserialized;
-
-                               try {
-                                       deserialized = 
InstantiationUtil.deserializeObject(serializedData,
-                                               userCodeClassLoader);
-                               } catch (Exception e) {
-                                       throw new 
InputSplitProviderException("Could not deserialize the serialized input 
split.", e);
-                               }
-
-                               return (InputSplit) deserialized;
-                       }
-               } else {
-                       throw new 
InputSplitProviderException("RequestNextInputSplit requires a response of type 
" +
-                               "NextInputSplit. Instead response is of type " 
+ result.getClass() + '.');
-               }
-
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
deleted file mode 100644
index 777633d..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.BaseTestingActorGateway;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.junit.Test;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.*;
-
-import java.util.concurrent.TimeUnit;
-
-public class TaskInputSplitProviderTest {
-
-       @Test
-       public void testRequestNextInputSplitWithInvalidExecutionID() throws 
InputSplitProviderException {
-
-               final JobID jobID = new JobID();
-               final JobVertexID vertexID = new JobVertexID();
-               final ExecutionAttemptID executionID = new ExecutionAttemptID();
-               final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               final ActorGateway gateway = new NullInputSplitGateway();
-
-
-               final TaskInputSplitProvider provider = new 
TaskInputSplitProvider(
-                       gateway,
-                       jobID,
-                       vertexID,
-                       executionID,
-                       timeout);
-
-               // The jobManager will return a
-               InputSplit nextInputSplit = 
provider.getNextInputSplit(getClass().getClassLoader());
-
-               assertTrue(nextInputSplit == null);
-       }
-
-       public static class NullInputSplitGateway extends 
BaseTestingActorGateway {
-
-               private static final long serialVersionUID = 
-7733997150554492926L;
-
-               public NullInputSplitGateway() {
-                       super(TestingUtils.defaultExecutionContext());
-               }
-
-               @Override
-               public Object handleMessage(Object message) throws Exception {
-                       if(message instanceof 
JobManagerMessages.RequestNextInputSplit) {
-                               return new 
JobManagerMessages.NextInputSplit(null);
-                       } else {
-                               throw new Exception("Invalid message type");
-                       }
-               }
-       }
-}

Reply via email to