http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java new file mode 100644 index 0000000..80f2aa0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +/** + * Exception indicating a problem with checkpointing on the {@link TaskExecutor} side. + */ +public class CheckpointException extends TaskManagerException { + + private static final long serialVersionUID = 3366394086880327955L; + + public CheckpointException(String message) { + super(message); + } + + public CheckpointException(String message, Throwable cause) { + super(message, cause); + } + + public CheckpointException(Throwable cause) { + super(cause); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java new file mode 100644 index 0000000..eecd0ae --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/PartitionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +/** + * Exception indicating a problem with the result partitions on the {@link TaskExecutor} side. + */ +public class PartitionException extends TaskManagerException { + + private static final long serialVersionUID = 6248696963418276618L; + + public PartitionException(String message) { + super(message); + } + + public PartitionException(String message, Throwable cause) { + super(message, cause); + } + + public PartitionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java new file mode 100644 index 0000000..a4a89c2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +/** + * Exception indicating a task related problem on the {@link TaskExecutor}. + */ +public class TaskException extends TaskManagerException { + + private static final long serialVersionUID = 968001398103156856L; + + public TaskException(String message) { + super(message); + } + + public TaskException(String message, Throwable cause) { + super(message, cause); + } + + public TaskException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java new file mode 100644 index 0000000..62d186e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskManagerException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +/** + * Base exception thrown by the {@link TaskExecutor}. + */ +public class TaskManagerException extends Exception { + + private static final long serialVersionUID = -2997745772227694731L; + + public TaskManagerException(String message) { + super(message); + } + + public TaskManagerException(String message, Throwable cause) { + super(message, cause); + } + + public TaskManagerException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java new file mode 100644 index 0000000..23f7812 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/TaskSubmissionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskmanager.Task; + +/** + * Exception indicating a problem with the {@link Task} submission at the {@link TaskException}. + */ +public class TaskSubmissionException extends TaskManagerException { + + private static final long serialVersionUID = 4589813591317690486L; + + public TaskSubmissionException(String message) { + super(message); + } + + public TaskSubmissionException(String message, Throwable cause) { + super(message, cause); + } + + public TaskSubmissionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java new file mode 100644 index 0000000..246c11d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.rpc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.state.CheckpointStateHandles; +import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.util.Preconditions; + +public class RpcCheckpointResponder implements CheckpointResponder { + + private final CheckpointCoordinatorGateway checkpointCoordinatorGateway; + + public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) { + this.checkpointCoordinatorGateway = Preconditions.checkNotNull(checkpointCoordinatorGateway); + } + + @Override + public void acknowledgeCheckpoint( + JobID jobID, + ExecutionAttemptID executionAttemptID, + CheckpointMetaData checkpointMetaData, + CheckpointStateHandles checkpointStateHandles) { + + checkpointCoordinatorGateway.acknowledgeCheckpoint( + jobID, + executionAttemptID, + checkpointMetaData, + checkpointStateHandles); + + } + + @Override + public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointMetaData checkpoint) { + checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpoint); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java new file mode 100644 index 0000000..4850d63 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.rpc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.SerializedInputSplit; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +public class RpcInputSplitProvider implements InputSplitProvider { + private final JobMasterGateway jobMasterGateway; + private final JobID jobID; + private final JobVertexID jobVertexID; + private final ExecutionAttemptID executionAttemptID; + private final Time timeout; + + public RpcInputSplitProvider( + JobMasterGateway jobMasterGateway, + JobID jobID, + JobVertexID jobVertexID, + ExecutionAttemptID executionAttemptID, + Time timeout) { + this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); + this.jobID = Preconditions.checkNotNull(jobID); + this.jobVertexID = Preconditions.checkNotNull(jobVertexID); + this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID); + this.timeout = Preconditions.checkNotNull(timeout); + } + + + @Override + public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { + Preconditions.checkNotNull(userCodeClassLoader); + + Future<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(jobVertexID, executionAttemptID); + + try { + SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit()); + + if (serializedInputSplit.isEmpty()) { + return null; + } else { + return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader); + } + } catch (Exception e) { + throw new InputSplitProviderException("Requesting the next input split failed.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java new file mode 100644 index 0000000..3692a71 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.rpc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateID; +import org.apache.flink.runtime.query.KvStateRegistryGateway; +import org.apache.flink.runtime.query.KvStateRegistryListener; +import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.util.Preconditions; + +public class RpcKvStateRegistryListener implements KvStateRegistryListener { + + private final KvStateRegistryGateway kvStateRegistryGateway; + private final KvStateServerAddress kvStateServerAddress; + + public RpcKvStateRegistryListener( + KvStateRegistryGateway kvStateRegistryGateway, + KvStateServerAddress kvStateServerAddress) { + this.kvStateRegistryGateway = Preconditions.checkNotNull(kvStateRegistryGateway); + this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress); + } + + @Override + public void notifyKvStateRegistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName, + KvStateID kvStateId) { + kvStateRegistryGateway.notifyKvStateRegistered( + jobId, + jobVertexId, + keyGroupRange, + registrationName, + kvStateId, + kvStateServerAddress); + + } + + @Override + public void notifyKvStateUnregistered( + JobID jobId, + JobVertexID jobVertexId, + KeyGroupRange keyGroupRange, + String registrationName) { + + kvStateRegistryGateway.notifyKvStateUnregistered( + jobId, + jobVertexId, + keyGroupRange, + registrationName); + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java new file mode 100644 index 0000000..ab111ad --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.rpc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.PartitionState; +import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.util.Preconditions; + +public class RpcPartitionStateChecker implements PartitionStateChecker { + + private final JobMasterGateway jobMasterGateway; + + public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) { + this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); + } + + @Override + public Future<PartitionState> requestPartitionState( + JobID jobId, + ExecutionAttemptID executionId, + IntermediateDataSetID resultId, + ResultPartitionID partitionId) { + + return jobMasterGateway.requestPartitionState(partitionId, executionId, resultId); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java new file mode 100644 index 0000000..29ad3b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.rpc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.taskmanager.TaskActions; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executor; + +public class RpcResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { + + private static final Logger LOG = LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class); + + private final JobMasterGateway jobMasterGateway; + private final Executor executor; + private final Time timeout; + + public RpcResultPartitionConsumableNotifier( + JobMasterGateway jobMasterGateway, + Executor executor, + Time timeout) { + this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); + this.executor = Preconditions.checkNotNull(executor); + this.timeout = Preconditions.checkNotNull(timeout); + } + @Override + public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) { + Future<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout); + + acknowledgeFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() { + @Override + public Void apply(Throwable value) { + LOG.error("Could not schedule or update consumers at the JobManager.", value); + + taskActions.failExternally(new RuntimeException("Could not notify JobManager to schedule or update consumers.", value)); + + return null; + } + }, executor); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java new file mode 100644 index 0000000..1f8d5ed --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/TaskExecutorMetricsInitializer.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.utils; + +import com.sun.management.OperatingSystemMXBean; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import java.lang.management.ClassLoadingMXBean; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.ThreadMXBean; +import java.util.List; + +/** + * Utility class ot initialize {@link TaskExecutor} specific metrics. + */ +public class TaskExecutorMetricsInitializer { + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorMetricsInitializer.class); + + public static void instantiateStatusMetrics( + MetricGroup taskManagerMetricGroup, + NetworkEnvironment network) { + MetricGroup status = taskManagerMetricGroup.addGroup("Status"); + + instantiateNetworkMetrics(status.addGroup("Network"), network); + + MetricGroup jvm = status.addGroup("JVM"); + + instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")); + instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")); + instantiateMemoryMetrics(jvm.addGroup("Memory")); + instantiateThreadMetrics(jvm.addGroup("Threads")); + instantiateCPUMetrics(jvm.addGroup("CPU")); + } + + private static void instantiateNetworkMetrics( + MetricGroup metrics, + final NetworkEnvironment network) { + metrics.<Long, Gauge<Long>>gauge("TotalMemorySegments", new Gauge<Long> () { + @Override + public Long getValue() { + return (long) network.getNetworkBufferPool().getTotalNumberOfMemorySegments(); + } + }); + + metrics.<Long, Gauge<Long>>gauge("AvailableMemorySegments", new Gauge<Long> () { + @Override + public Long getValue() { + return (long) network.getNetworkBufferPool().getNumberOfAvailableMemorySegments(); + } + }); + } + + private static void instantiateClassLoaderMetrics(MetricGroup metrics) { + final ClassLoadingMXBean mxBean = ManagementFactory.getClassLoadingMXBean(); + + metrics.<Long, Gauge<Long>>gauge("ClassesLoaded", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getTotalLoadedClassCount(); + } + }); + + metrics.<Long, Gauge<Long>>gauge("ClassesUnloaded", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getUnloadedClassCount(); + } + }); + } + + private static void instantiateGarbageCollectorMetrics(MetricGroup metrics) { + List<GarbageCollectorMXBean> garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); + + for (final GarbageCollectorMXBean garbageCollector: garbageCollectors) { + MetricGroup gcGroup = metrics.addGroup(garbageCollector.getName()); + + gcGroup.<Long, Gauge<Long>>gauge("Count", new Gauge<Long> () { + @Override + public Long getValue() { + return garbageCollector.getCollectionCount(); + } + }); + + gcGroup.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { + @Override + public Long getValue() { + return garbageCollector.getCollectionTime(); + } + }); + } + } + + private static void instantiateMemoryMetrics(MetricGroup metrics) { + final MemoryMXBean mxBean = ManagementFactory.getMemoryMXBean(); + + MetricGroup heap = metrics.addGroup("Heap"); + + heap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getUsed(); + } + }); + heap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getCommitted(); + } + }); + heap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getHeapMemoryUsage().getMax(); + } + }); + + MetricGroup nonHeap = metrics.addGroup("NonHeap"); + + nonHeap.<Long, Gauge<Long>>gauge("Used", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getUsed(); + } + }); + nonHeap.<Long, Gauge<Long>>gauge("Committed", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getCommitted(); + } + }); + nonHeap.<Long, Gauge<Long>>gauge("Max", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getNonHeapMemoryUsage().getMax(); + } + }); + + final MBeanServer con = ManagementFactory.getPlatformMBeanServer(); + + final String directBufferPoolName = "java.nio:type=BufferPool,name=direct"; + + try { + final ObjectName directObjectName = new ObjectName(directBufferPoolName); + + MetricGroup direct = metrics.addGroup("Direct"); + + direct.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "Count", -1L)); + direct.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "MemoryUsed", -1L)); + direct.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, directObjectName, "TotalCapacity", -1L)); + } catch (MalformedObjectNameException e) { + LOG.warn("Could not create object name {}.", directBufferPoolName, e); + } + + final String mappedBufferPoolName = "java.nio:type=BufferPool,name=mapped"; + + try { + final ObjectName mappedObjectName = new ObjectName(mappedBufferPoolName); + + MetricGroup mapped = metrics.addGroup("Mapped"); + + mapped.<Long, Gauge<Long>>gauge("Count", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "Count", -1L)); + mapped.<Long, Gauge<Long>>gauge("MemoryUsed", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "MemoryUsed", -1L)); + mapped.<Long, Gauge<Long>>gauge("TotalCapacity", new TaskExecutorMetricsInitializer.AttributeGauge<>(con, mappedObjectName, "TotalCapacity", -1L)); + } catch (MalformedObjectNameException e) { + LOG.warn("Could not create object name {}.", mappedBufferPoolName, e); + } + } + + private static void instantiateThreadMetrics(MetricGroup metrics) { + final ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + + metrics.<Integer, Gauge<Integer>>gauge("Count", new Gauge<Integer> () { + @Override + public Integer getValue() { + return mxBean.getThreadCount(); + } + }); + } + + private static void instantiateCPUMetrics(MetricGroup metrics) { + try { + final OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + + metrics.<Double, Gauge<Double>>gauge("Load", new Gauge<Double> () { + @Override + public Double getValue() { + return mxBean.getProcessCpuLoad(); + } + }); + metrics.<Long, Gauge<Long>>gauge("Time", new Gauge<Long> () { + @Override + public Long getValue() { + return mxBean.getProcessCpuTime(); + } + }); + } catch (Exception e) { + LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + + " - CPU load metrics will not be available.", e); + } + } + + private static final class AttributeGauge<T> implements Gauge<T> { + private final MBeanServer server; + private final ObjectName objectName; + private final String attributeName; + private final T errorValue; + + private AttributeGauge(MBeanServer server, ObjectName objectName, String attributeName, T errorValue) { + this.server = Preconditions.checkNotNull(server); + this.objectName = Preconditions.checkNotNull(objectName); + this.attributeName = Preconditions.checkNotNull(attributeName); + this.errorValue = errorValue; + } + + @SuppressWarnings("unchecked") + @Override + public T getValue() { + try { + return (T) server.getAttribute(objectName, attributeName); + } catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) { + LOG.warn("Could not read attribute {}.", attributeName, e); + return errorValue; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java new file mode 100644 index 0000000..b3a0cbb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerActions.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.util.Preconditions; + +/** + * Implementation using {@link ActorGateway} to forward the messages. + */ +public class ActorGatewayTaskManagerActions implements TaskManagerActions { + + private final ActorGateway actorGateway; + + public ActorGatewayTaskManagerActions(ActorGateway actorGateway) { + this.actorGateway = Preconditions.checkNotNull(actorGateway); + } + + @Override + public void notifyFinalState(ExecutionAttemptID executionAttemptID) { + actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID)); + } + + @Override + public void notifyFatalError(String message, Throwable cause) { + actorGateway.tell(new TaskManagerMessages.FatalError(message, cause)); + } + + @Override + public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) { + actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause)); + } + + @Override + public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { + TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState); + + actorGateway.tell(actorMessage); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java deleted file mode 100644 index cddac55..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayTaskManagerConnection.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.messages.TaskManagerMessages; -import org.apache.flink.runtime.messages.TaskMessages; -import org.apache.flink.util.Preconditions; - -/** - * Implementation using {@link ActorGateway} to forward the messages. - */ -public class ActorGatewayTaskManagerConnection implements TaskManagerConnection { - - private final ActorGateway actorGateway; - - public ActorGatewayTaskManagerConnection(ActorGateway actorGateway) { - this.actorGateway = Preconditions.checkNotNull(actorGateway); - } - - @Override - public void notifyFinalState(ExecutionAttemptID executionAttemptID) { - actorGateway.tell(new TaskMessages.TaskInFinalState(executionAttemptID)); - } - - @Override - public void notifyFatalError(String message, Throwable cause) { - actorGateway.tell(new TaskManagerMessages.FatalError(message, cause)); - } - - @Override - public void failTask(ExecutionAttemptID executionAttemptID, Throwable cause) { - actorGateway.tell(new TaskMessages.FailTask(executionAttemptID, cause)); - } - - @Override - public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { - TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(taskExecutionState); - - actorGateway.tell(actorMessage); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 02a41b5..977e563 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.io.network.PartitionState; @@ -131,6 +132,9 @@ public class Task implements Runnable, TaskActions { /** The execution attempt of the parallel subtask */ private final ExecutionAttemptID executionId; + /** ID which identifies the slot in which the task is supposed to run */ + private final AllocationID allocationID; + /** TaskInfo object for this task */ private final TaskInfo taskInfo; @@ -176,7 +180,7 @@ public class Task implements Runnable, TaskActions { private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById; /** Connection to the task manager */ - private final TaskManagerConnection taskManagerConnection; + private final TaskManagerActions taskManagerActions; /** Input split provider for the task */ private final InputSplitProvider inputSplitProvider; @@ -259,7 +263,7 @@ public class Task implements Runnable, TaskActions { IOManager ioManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager bcVarManager, - TaskManagerConnection taskManagerConnection, + TaskManagerActions taskManagerActions, InputSplitProvider inputSplitProvider, CheckpointResponder checkpointResponder, LibraryCacheManager libraryCache, @@ -274,6 +278,7 @@ public class Task implements Runnable, TaskActions { this.jobId = checkNotNull(tdd.getJobID()); this.vertexId = checkNotNull(tdd.getVertexID()); this.executionId = checkNotNull(tdd.getExecutionId()); + this.allocationID = checkNotNull(tdd.getAllocationID()); this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks(); this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); @@ -296,7 +301,7 @@ public class Task implements Runnable, TaskActions { this.inputSplitProvider = checkNotNull(inputSplitProvider); this.checkpointResponder = checkNotNull(checkpointResponder); - this.taskManagerConnection = checkNotNull(taskManagerConnection); + this.taskManagerActions = checkNotNull(taskManagerActions); this.libraryCache = checkNotNull(libraryCache); this.fileCache = checkNotNull(fileCache); @@ -380,6 +385,10 @@ public class Task implements Runnable, TaskActions { return executionId; } + public AllocationID getAllocationID() { + return allocationID; + } + public TaskInfo getTaskInfo() { return taskInfo; } @@ -600,7 +609,7 @@ public class Task implements Runnable, TaskActions { // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); - taskManagerConnection.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); + taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); @@ -793,11 +802,11 @@ public class Task implements Runnable, TaskActions { } private void notifyFinalState() { - taskManagerConnection.notifyFinalState(executionId); + taskManagerActions.notifyFinalState(executionId); } private void notifyFatalError(String message, Throwable cause) { - taskManagerConnection.notifyFatalError(message, cause); + taskManagerActions.notifyFatalError(message, cause); } // ---------------------------------------------------------------------------------------------------------------- @@ -823,7 +832,7 @@ public class Task implements Runnable, TaskActions { ((StoppableTask)Task.this.invokable).stop(); } catch(RuntimeException e) { LOG.error("Stopping task " + taskNameWithSubtask + " failed.", e); - taskManagerConnection.failTask(executionId, e); + taskManagerActions.failTask(executionId, e); } } }; http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 60aadf5..877cc1e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.util.SerializedThrowable; +import java.io.Serializable; + /** * This class represents an update about a task's execution state. * @@ -34,7 +36,7 @@ import org.apache.flink.runtime.util.SerializedThrowable; * exception field transient and deserialized it lazily, with the * appropriate class loader. */ -public class TaskExecutionState implements java.io.Serializable { +public class TaskExecutionState implements Serializable { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index 60beae0..31c518a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.util.InstantiationUtil; @@ -63,35 +64,45 @@ public class TaskInputSplitProvider implements InputSplitProvider { } @Override - public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) { + public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException { Preconditions.checkNotNull(userCodeClassLoader); + final Future<Object> response = jobManager.ask( + new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID), + timeout); + + final Object result; + try { - final Future<Object> response = jobManager.ask( - new JobManagerMessages.RequestNextInputSplit(jobID, vertexID, executionID), - timeout); + result = Await.result(response, timeout); + } catch (Exception e) { + throw new InputSplitProviderException("Did not receive next input split from JobManager.", e); + } - final Object result = Await.result(response, timeout); + if(result instanceof JobManagerMessages.NextInputSplit){ + final JobManagerMessages.NextInputSplit nextInputSplit = + (JobManagerMessages.NextInputSplit) result; - if(result instanceof JobManagerMessages.NextInputSplit){ - final JobManagerMessages.NextInputSplit nextInputSplit = - (JobManagerMessages.NextInputSplit) result; + byte[] serializedData = nextInputSplit.splitData(); - byte[] serializedData = nextInputSplit.splitData(); + if(serializedData == null) { + return null; + } else { + final Object deserialized; - if(serializedData == null) { - return null; - } else { - Object deserialized = InstantiationUtil.deserializeObject(serializedData, + try { + deserialized = InstantiationUtil.deserializeObject(serializedData, userCodeClassLoader); - return (InputSplit) deserialized; + } catch (Exception e) { + throw new InputSplitProviderException("Could not deserialize the serialized input split.", e); } - } else { - throw new Exception("RequestNextInputSplit requires a response of type " + - "NextInputSplit. Instead response is of type " + result.getClass() + '.'); + + return (InputSplit) deserialized; } - } catch (Exception e) { - throw new RuntimeException("Requesting the next InputSplit failed.", e); + } else { + throw new InputSplitProviderException("RequestNextInputSplit requires a response of type " + + "NextInputSplit. Instead response is of type " + result.getClass() + '.'); } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java new file mode 100644 index 0000000..2f3a0cb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerActions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +/** + * Interface for the communication of the {@link Task} with the {@link TaskManager}. + */ +public interface TaskManagerActions { + + /** + * Notifies the task manager that the given task is in a final state. + * + * @param executionAttemptID Execution attempt ID of the task + */ + void notifyFinalState(ExecutionAttemptID executionAttemptID); + + /** + * Notifies the task manager about a fatal error occurred in the task. + * + * @param message Message to report + * @param cause Cause of the fatal error + */ + void notifyFatalError(String message, Throwable cause); + + /** + * Tells the task manager to fail the given task. + * + * @param executionAttemptID Execution attempt ID of the task to fail + * @param cause Cause of the failure + */ + void failTask(ExecutionAttemptID executionAttemptID, Throwable cause); + + /** + * Notifies the task manager about the task execution state update. + * + * @param taskExecutionState Task execution state update + */ + void updateTaskExecutionState(TaskExecutionState taskExecutionState); +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java deleted file mode 100644 index dc1b40f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerConnection.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskmanager; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; - -/** - * Interface for the communication of the {@link Task} with the {@link TaskManager}. - */ -public interface TaskManagerConnection { - - /** - * Notifies the task manager that the given task is in a final state. - * - * @param executionAttemptID Execution attempt ID of the task - */ - void notifyFinalState(ExecutionAttemptID executionAttemptID); - - /** - * Notifies the task manager about a fatal error occurred in the task. - * - * @param message Message to report - * @param cause Cause of the fatal error - */ - void notifyFatalError(String message, Throwable cause); - - /** - * Tells the task manager to fail the given task. - * - * @param executionAttemptID Execution attempt ID of the task to fail - * @param cause Cause of the failure - */ - void failTask(ExecutionAttemptID executionAttemptID, Throwable cause); - - /** - * Notifies the task manager about the task execution state update. - * - * @param taskExecutionState Task execution state update - */ - void updateTaskExecutionState(TaskExecutionState taskExecutionState); -} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 7a764ca..da8c14e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -25,7 +25,6 @@ import java.net.{InetAddress, InetSocketAddress} import java.util import java.util.UUID import java.util.concurrent.TimeUnit -import javax.management.ObjectName import _root_.akka.actor._ import _root_.akka.pattern.ask @@ -37,7 +36,6 @@ import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem -import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -59,7 +57,7 @@ import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, Leader import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.messages.Messages._ import org.apache.flink.runtime.messages.RegistrationMessages._ -import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure, ResponseStackTraceSampleSuccess, SampleTaskStackTrace, StackTraceSampleMessages, TriggerStackTraceSample} +import org.apache.flink.runtime.messages.StackTraceSampleMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages._ import org.apache.flink.runtime.messages.TaskMessages._ import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint} @@ -68,7 +66,8 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration} import org.apache.flink.runtime.security.SecurityContext -import org.apache.flink.runtime.taskexecutor.{TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} +import org.apache.flink.runtime.taskexecutor._ +import org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.NetUtils @@ -150,7 +149,7 @@ class TaskManager( protected val bcVarManager = new BroadcastVariableManager() /** Handler for distributed files cached by this TaskManager */ - protected val fileCache = new FileCache(config.getConfiguration()) + protected val fileCache = new FileCache(config.getTmpDirPaths()) /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() @@ -196,7 +195,7 @@ class TaskManager( CheckpointResponder, PartitionStateChecker, ResultPartitionConsumableNotifier, - TaskManagerConnection)] = None + TaskManagerActions)] = None // -------------------------------------------------------------------------- // Actor messages and life cycle @@ -940,9 +939,9 @@ class TaskManager( val jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID.orNull) val taskManagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull) - val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway); + val checkpointResponder = new ActorGatewayCheckpointResponder(jobManagerGateway) - val taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway) + val taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway) val partitionStateChecker = new ActorGatewayPartitionStateChecker( jobManagerGateway, @@ -998,7 +997,7 @@ class TaskManager( taskManagerMetricGroup = new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) - TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network) + TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, network) // watch job manager to detect when it dies context.watch(jobManager) @@ -2008,23 +2007,23 @@ object TaskManager { // Pre-processing steps for registering cpuLoad val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean() - - val fetchCPULoadMethod: Option[Method] = + + val fetchCPULoadMethod: Option[Method] = try { Class.forName("com.sun.management.OperatingSystemMXBean") .getMethods() - .find( _.getName() == "getProcessCpuLoad" ) + .find(_.getName() == "getProcessCpuLoad") } catch { case t: Throwable => LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") + " - CPU load metrics will not be available.") None } metricRegistry.register("cpuLoad", new Gauge[Double] { override def getValue: Double = { - try{ + try { fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0) } catch { @@ -2036,146 +2035,4 @@ object TaskManager { }) metricRegistry } - - private def instantiateStatusMetrics( - taskManagerMetricGroup: MetricGroup, - network: NetworkEnvironment) - : Unit = { - val status = taskManagerMetricGroup - .addGroup("Status") - - instantiateNetworkMetrics(status.addGroup("Network"), network) - - val jvm = status - .addGroup("JVM") - - instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader")) - instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector")) - instantiateMemoryMetrics(jvm.addGroup("Memory")) - instantiateThreadMetrics(jvm.addGroup("Threads")) - instantiateCPUMetrics(jvm.addGroup("CPU")) - } - - private def instantiateNetworkMetrics( - metrics: MetricGroup, - network: NetworkEnvironment) - : Unit = { - metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] { - override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments - }) - metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] { - override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments - }) - } - - private def instantiateClassLoaderMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getClassLoadingMXBean - - metrics.gauge[Long, FlinkGauge[Long]]("ClassesLoaded", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getTotalLoadedClassCount - }) - metrics.gauge[Long, FlinkGauge[Long]]("ClassesUnloaded", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getUnloadedClassCount - }) - } - - private def instantiateGarbageCollectorMetrics(metrics: MetricGroup) { - val garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans - - for (garbageCollector <- garbageCollectors.asScala) { - val gcGroup = metrics.addGroup(garbageCollector.getName) - gcGroup.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = garbageCollector.getCollectionCount - }) - gcGroup.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { - override def getValue: Long = garbageCollector.getCollectionTime - }) - } - } - - private def instantiateMemoryMetrics(metrics: MetricGroup) { - val mxBean = ManagementFactory.getMemoryMXBean - val heap = metrics.addGroup("Heap") - heap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getUsed - }) - heap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getCommitted - }) - heap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getHeapMemoryUsage.getMax - }) - - val nonHeap = metrics.addGroup("NonHeap") - nonHeap.gauge[Long, FlinkGauge[Long]]("Used", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getUsed - }) - nonHeap.gauge[Long, FlinkGauge[Long]]("Committed", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getCommitted - }) - nonHeap.gauge[Long, FlinkGauge[Long]]("Max", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getNonHeapMemoryUsage.getMax - }) - - val con = ManagementFactory.getPlatformMBeanServer; - - val directObjectName = new ObjectName("java.nio:type=BufferPool,name=direct") - - val direct = metrics.addGroup("Direct") - direct.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "Count").asInstanceOf[Long] - }) - direct.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "MemoryUsed").asInstanceOf[Long] - }) - direct.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(directObjectName, "TotalCapacity").asInstanceOf[Long] - }) - - val mappedObjectName = new ObjectName("java.nio:type=BufferPool,name=mapped") - - val mapped = metrics.addGroup("Mapped") - mapped.gauge[Long, FlinkGauge[Long]]("Count", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "Count").asInstanceOf[Long] - }) - mapped.gauge[Long, FlinkGauge[Long]]("MemoryUsed", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "MemoryUsed").asInstanceOf[Long] - }) - mapped.gauge[Long, FlinkGauge[Long]]("TotalCapacity", new FlinkGauge[Long] { - override def getValue: Long = con - .getAttribute(mappedObjectName, "TotalCapacity").asInstanceOf[Long] - }) - } - - private def instantiateThreadMetrics(metrics: MetricGroup): Unit = { - val mxBean = ManagementFactory.getThreadMXBean - - metrics.gauge[Int, FlinkGauge[Int]]("Count", new FlinkGauge[Int] { - override def getValue: Int = mxBean.getThreadCount - }) - } - - private def instantiateCPUMetrics(metrics: MetricGroup): Unit = { - try { - val mxBean = ManagementFactory.getOperatingSystemMXBean - .asInstanceOf[com.sun.management.OperatingSystemMXBean] - - metrics.gauge[Double, FlinkGauge[Double]]("Load", new FlinkGauge[Double] { - override def getValue: Double = mxBean.getProcessCpuLoad - }) - metrics.gauge[Long, FlinkGauge[Long]]("Time", new FlinkGauge[Long] { - override def getValue: Long = mxBean.getProcessCpuTime - }) - } - catch { - case t: Throwable => - LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" + - " - CPU load metrics will not be available.") - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java index c369674..4db0d93 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.filecache; import java.io.File; import java.util.concurrent.Future; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.JobID; @@ -62,8 +61,9 @@ public class FileCacheDeleteValidationTest { @Before public void setup() { + String[] tmpDirectories = System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator); try { - fileCache = new FileCache(new Configuration()); + fileCache = new FileCache(tmpDirectories); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index f8a0b6a..30dfef5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -42,7 +42,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 7710fa9..f5fe52c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -29,6 +32,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; @@ -38,7 +42,6 @@ import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; -import org.hamcrest.Matchers; import org.junit.Test; import org.powermock.api.mockito.PowerMockito; @@ -60,10 +63,14 @@ public class TaskExecutorTest extends TestLogger { ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class); TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); + PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); + rpc.registerGateway(resourceManagerAddress, rmGateway); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + when(taskManagerLocation.getHostname()).thenReturn("foobar"); NonHaServices haServices = new NonHaServices(resourceManagerAddress); @@ -76,6 +83,9 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -113,9 +123,12 @@ public class TaskExecutorTest extends TestLogger { TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); + PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + when(taskManagerLocation.getHostname()).thenReturn("foobar"); TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, @@ -126,6 +139,9 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), mock(FatalErrorHandler.class)); taskManager.start(); @@ -182,9 +198,12 @@ public class TaskExecutorTest extends TestLogger { TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + PowerMockito.when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration()); + PowerMockito.when(taskManagerServicesConfiguration.getTmpDirPaths()).thenReturn(new String[1]); TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + when(taskManagerLocation.getHostname()).thenReturn("foobar"); TaskExecutor taskManager = new TaskExecutor( taskManagerServicesConfiguration, @@ -195,6 +214,9 @@ public class TaskExecutorTest extends TestLogger { mock(NetworkEnvironment.class), haServices, mock(MetricRegistry.class), + mock(TaskManagerMetricGroup.class), + mock(BroadcastVariableManager.class), + mock(FileCache.class), mock(FatalErrorHandler.class)); taskManager.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index e2abe88..9a79935 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -173,7 +173,7 @@ public class TaskAsyncCallTest { mock(IOManager.class), networkEnvironment, mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), + mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), libCache, http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java index 642300d..777633d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProviderTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.instance.BaseTestingActorGateway; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.junit.Test; @@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit; public class TaskInputSplitProviderTest { @Test - public void testRequestNextInputSplitWithInvalidExecutionID() { + public void testRequestNextInputSplitWithInvalidExecutionID() throws InputSplitProviderException { final JobID jobID = new JobID(); final JobVertexID vertexID = new JobVertexID(); http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java index 9791cee..5d3eb3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -68,6 +69,7 @@ public class TaskStopTest { when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class)); when(tddMock.getInvokableClassName()).thenReturn("className"); + when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class)); task = new Task( tddMock, @@ -75,7 +77,7 @@ public class TaskStopTest { mock(IOManager.class), mock(NetworkEnvironment.class), mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), + mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), mock(LibraryCacheManager.class), http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 9a13cde..fe618ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -97,7 +97,7 @@ public class TaskTest { private ActorGateway listenerGateway; private ActorGatewayTaskExecutionStateListener listener; - private ActorGatewayTaskManagerConnection taskManagerConnection; + private ActorGatewayTaskManagerActions taskManagerConnection; private BlockingQueue<Object> taskManagerMessages; private BlockingQueue<Object> jobManagerMessages; @@ -113,7 +113,7 @@ public class TaskTest { listenerGateway = new ForwardingActorGateway(listenerMessages); listener = new ActorGatewayTaskExecutionStateListener(listenerGateway); - taskManagerConnection = new ActorGatewayTaskManagerConnection(taskManagerGateway); + taskManagerConnection = new ActorGatewayTaskManagerActions(taskManagerGateway); awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java index 343affe..c067ca7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.Iterator; @@ -146,7 +147,12 @@ public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<O return true; } - InputSplit split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); + final InputSplit split; + try { + split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); + } catch (InputSplitProviderException e) { + throw new RuntimeException("Could not retrieve next input split.", e); + } if (split != null) { this.nextSplit = split; http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index b5b6582..ffda126 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -47,7 +47,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.taskmanager.TaskManagerConnection; +import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -59,6 +59,7 @@ import org.apache.flink.util.SerializedValue; import org.junit.Test; import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URL; @@ -153,17 +154,19 @@ public class InterruptSensitiveRestoreTest { when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); + String[] tmpDirectories = EnvironmentInformation.getTemporaryFileDirectory().split(",|" + File.pathSeparator); + return new Task( tdd, mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), + mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), new FallbackLibraryCacheManager(), - new FileCache(new Configuration()), + new FileCache(tmpDirectories), new TaskManagerRuntimeInfo( "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), new UnregisteredTaskMetricsGroup(), @@ -266,4 +269,4 @@ public class InterruptSensitiveRestoreTest { fail("should never be called"); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/a00619a4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 8aae19f..106d3df 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -51,7 +51,7 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener; -import org.apache.flink.runtime.taskmanager.TaskManagerConnection; +import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -238,7 +238,7 @@ public class StreamTaskTest { mock(IOManager.class), network, mock(BroadcastVariableManager.class), - mock(TaskManagerConnection.class), + mock(TaskManagerActions.class), mock(InputSplitProvider.class), mock(CheckpointResponder.class), libCache,