This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6349db7df4576eaa637d449ee45d76d7232c48ca Author: Andrey Zagrebin <azagre...@gmail.com> AuthorDate: Thu May 30 15:16:16 2019 +0200 [FLINK-11392][network] Introduce ShuffleEnvironment interface --- .../runtime/io/network/NetworkEnvironment.java | 107 ++++++------- .../network/api/writer/ResultPartitionWriter.java | 12 +- .../flink/runtime/shuffle/ShuffleEnvironment.java | 166 +++++++++++++++++++++ .../runtime/shuffle/ShuffleEnvironmentContext.java | 94 ++++++++++++ .../flink/runtime/taskexecutor/TaskExecutor.java | 15 +- .../runtime/taskexecutor/TaskManagerRunner.java | 13 +- .../runtime/taskexecutor/TaskManagerServices.java | 74 ++++----- .../TaskManagerServicesConfiguration.java | 59 +++++++- .../org/apache/flink/runtime/taskmanager/Task.java | 12 +- .../runtime/util/ConfigurationParserUtils.java | 8 +- .../io/network/NetworkEnvironmentBuilder.java | 2 +- .../runtime/io/network/NetworkEnvironmentTest.java | 4 +- .../io/network/partition/ResultPartitionTest.java | 2 +- .../partition/consumer/SingleInputGateTest.java | 12 +- .../TaskExecutorLocalStateStoresManagerTest.java | 18 +-- .../taskexecutor/TaskExecutorSubmissionTest.java | 18 +-- .../runtime/taskexecutor/TaskExecutorTest.java | 10 +- .../taskexecutor/TaskManagerServicesBuilder.java | 12 +- .../TaskSubmissionTestEnvironment.java | 45 +++--- .../runtime/taskmanager/TaskAsyncCallTest.java | 14 +- .../apache/flink/runtime/taskmanager/TaskTest.java | 27 ++-- .../runtime/util/JvmExitOnFatalErrorTest.java | 6 +- .../StreamNetworkBenchmarkEnvironment.java | 4 +- .../tasks/InterruptSensitiveRestoreTest.java | 6 +- .../runtime/tasks/StreamTaskTerminationTest.java | 6 +- .../streaming/runtime/tasks/StreamTaskTest.java | 6 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 6 +- 28 files changed, 539 insertions(+), 225 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 01c62ba..8483f84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -50,7 +49,8 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFac import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.util.Preconditions; @@ -58,7 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.InetAddress; +import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -68,10 +68,11 @@ import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Network I/O components of each {@link TaskExecutor} instance. The network environment contains - * the data structures that keep track of all intermediate results and all data exchanges. + * The implementation of {@link ShuffleEnvironment} based on netty network communication, local memory and disk files. + * The network environment contains the data structures that keep track of all intermediate results + * and shuffle data exchanges. */ -public class NetworkEnvironment { +public class NetworkEnvironment implements ShuffleEnvironment<ResultPartition, SingleInputGate> { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); @@ -102,7 +103,7 @@ public class NetworkEnvironment { private final SingleInputGateFactory singleInputGateFactory; - private boolean isShutdown; + private boolean isClosed; private NetworkEnvironment( ResourceID taskExecutorLocation, @@ -120,12 +121,12 @@ public class NetworkEnvironment { this.inputGatesById = new ConcurrentHashMap<>(); this.resultPartitionFactory = resultPartitionFactory; this.singleInputGateFactory = singleInputGateFactory; - this.isShutdown = false; + this.isClosed = false; } public static NetworkEnvironment create( - ResourceID taskExecutorLocation, NetworkEnvironmentConfiguration config, + ResourceID taskExecutorLocation, TaskEventPublisher taskEventPublisher, MetricGroup metricGroup, IOManager ioManager) { @@ -213,11 +214,7 @@ public class NetworkEnvironment { return Optional.ofNullable(inputGatesById.get(id)); } - /** - * Batch release intermediate result partitions. - * - * @param partitionIds partition ids to release - */ + @Override public void releasePartitions(Collection<ResultPartitionID> partitionIds) { for (ResultPartitionID partitionId : partitionIds) { resultPartitionManager.releasePartition(partitionId, null); @@ -230,7 +227,8 @@ public class NetworkEnvironment { * @return collection of partitions which still occupy some resources locally on this task executor * and have been not released yet. */ - public Collection<ResultPartitionID> getUnreleasedPartitions() { + @Override + public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() { return resultPartitionManager.getUnreleasedPartitions(); } @@ -238,54 +236,56 @@ public class NetworkEnvironment { // Create Output Writers and Input Readers // -------------------------------------------------------------------------------------------- - public ResultPartition[] createResultPartitionWriters( - String taskName, - ExecutionAttemptID executionId, + @Override + public Collection<ResultPartition> createResultPartitionWriters( + String ownerName, + ExecutionAttemptID executionAttemptID, Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors, MetricGroup outputGroup, MetricGroup buffersGroup) { synchronized (lock) { - Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); + Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down."); ResultPartition[] resultPartitions = new ResultPartition[resultPartitionDeploymentDescriptors.size()]; int counter = 0; for (ResultPartitionDeploymentDescriptor rpdd : resultPartitionDeploymentDescriptors) { - resultPartitions[counter++] = resultPartitionFactory.create(taskName, executionId, rpdd); + resultPartitions[counter++] = resultPartitionFactory.create(ownerName, executionAttemptID, rpdd); } registerOutputMetrics(outputGroup, buffersGroup, resultPartitions); - return resultPartitions; + return Arrays.asList(resultPartitions); } } - public SingleInputGate[] createInputGates( - String taskName, - ExecutionAttemptID executionId, + @Override + public Collection<SingleInputGate> createInputGates( + String ownerName, + ExecutionAttemptID executionAttemptID, PartitionProducerStateProvider partitionProducerStateProvider, Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors, MetricGroup parentGroup, MetricGroup inputGroup, MetricGroup buffersGroup) { synchronized (lock) { - Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); + Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down."); InputChannelMetrics inputChannelMetrics = new InputChannelMetrics(parentGroup); SingleInputGate[] inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()]; int counter = 0; for (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) { SingleInputGate inputGate = singleInputGateFactory.create( - taskName, + ownerName, igdd, partitionProducerStateProvider, inputChannelMetrics); - InputGateID id = new InputGateID(igdd.getConsumedResultId(), executionId); + InputGateID id = new InputGateID(igdd.getConsumedResultId(), executionAttemptID); inputGatesById.put(id, inputGate); inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id)); inputGates[counter++] = inputGate; } registerInputMetrics(inputGroup, buffersGroup, inputGates); - return inputGates; + return Arrays.asList(inputGates); } } @@ -305,16 +305,7 @@ public class NetworkEnvironment { buffersGroup.gauge(METRIC_INPUT_POOL_USAGE, new InputBufferPoolUsageGauge(inputGates)); } - /** - * Update consuming gate with newly available partition. - * - * @param consumerID execution id of consumer to identify belonging to it gate. - * @param partitionInfo telling where the partition can be retrieved from - * @return {@code true} if the partition has been updated or {@code false} if the partition is not available anymore. - * @throws IOException IO problem by the update - * @throws InterruptedException potentially blocking operation was interrupted - * @throws IllegalStateException the input gate with the id from the partitionInfo is not found - */ + @Override public boolean updatePartitionInfo( ExecutionAttemptID consumerID, PartitionInfo partitionInfo) throws IOException, InterruptedException { @@ -337,9 +328,10 @@ public class NetworkEnvironment { * * @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible. */ + @Override public int start() throws IOException { synchronized (lock) { - Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down."); + Preconditions.checkState(!isClosed, "The NetworkEnvironment has already been shut down."); LOG.info("Starting the network environment and its components."); @@ -355,9 +347,10 @@ public class NetworkEnvironment { /** * Tries to shut down all network I/O components. */ - public void shutdown() { + @Override + public void close() { synchronized (lock) { - if (isShutdown) { + if (isClosed) { return; } @@ -392,29 +385,27 @@ public class NetworkEnvironment { LOG.warn("Network buffer pool did not shut down properly.", t); } - isShutdown = true; + isClosed = true; } } - public boolean isShutdown() { + public boolean isClosed() { synchronized (lock) { - return isShutdown; + return isClosed; } } - public static NetworkEnvironment fromConfiguration( - Configuration configuration, - TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup, - IOManager ioManager, - long maxJvmHeapMemory, - boolean localTaskManagerCommunication, - InetAddress taskManagerAddress) { - final NetworkEnvironmentConfiguration networkConfig = NetworkEnvironmentConfiguration.fromConfiguration( - configuration, - maxJvmHeapMemory, - localTaskManagerCommunication, - taskManagerAddress); - return NetworkEnvironment.create(networkConfig, taskEventPublisher, metricGroup, ioManager); + public static NetworkEnvironment fromShuffleContext(ShuffleEnvironmentContext shuffleEnvironmentContext) { + NetworkEnvironmentConfiguration networkConfig = NetworkEnvironmentConfiguration.fromConfiguration( + shuffleEnvironmentContext.getConfiguration(), + shuffleEnvironmentContext.getMaxJvmHeapMemory(), + shuffleEnvironmentContext.isLocalCommunicationOnly(), + shuffleEnvironmentContext.getHostAddress()); + return create( + networkConfig, + shuffleEnvironmentContext.getLocation(), + shuffleEnvironmentContext.getEventPublisher(), + shuffleEnvironmentContext.getParentMetricGroup(), + shuffleEnvironmentContext.getIOManager()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 6c869e9..1945bb0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -28,6 +28,11 @@ import java.io.IOException; /** * A buffer-oriented runtime result writer API for producing results. + * + * <p>If {@link ResultPartitionWriter#close()} is called before {@link ResultPartitionWriter#fail(Throwable)} or + * {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and cancellation of production. + * In this case {@link ResultPartitionWriter#fail(Throwable)} still needs to be called afterwards to fully release + * all resources associated the the partition and propagate failure cause to the consumer if possible. */ public interface ResultPartitionWriter extends AutoCloseable { @@ -73,8 +78,9 @@ public interface ResultPartitionWriter extends AutoCloseable { /** * Fail the production of the partition. * - * <p>This method propagates non-{@code null} failure causes to consumers on a best-effort basis. - * Closing of partition is still needed. + * <p>This method propagates non-{@code null} failure causes to consumers on a best-effort basis. This call also + * leads to the release of all resources associated with the partition. Closing of the partition is still needed + * afterwards if it has not been done before. * * @param throwable failure cause */ @@ -83,7 +89,7 @@ public interface ResultPartitionWriter extends AutoCloseable { /** * Successfully finish the production of the partition. * - * <p>Closing of partition is still needed. + * <p>Closing of partition is still needed afterwards. */ void finish() throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java new file mode 100644 index 0000000..882e52c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; + +import java.io.IOException; +import java.util.Collection; + +/** + * Interface for the implementation of shuffle service local environment. + * + * <p>Input/Output interface of local shuffle service environment is based on memory {@link Buffer Buffers}. A producer + * can write shuffle data into the buffers, obtained from the created {@link ResultPartitionWriter ResultPartitionWriters} + * and a consumer reads the buffers from the created {@link InputGate InputGates}. + * + * <h1>Lifecycle management.</h1> + * + * <p>The interface contains method's to manage the lifecycle of the local shuffle service environment: + * <ol> + * <li>{@link ShuffleEnvironment#start} must be called before using the shuffle service environment.</li> + * <li>{@link ShuffleEnvironment#close} is called to release the shuffle service environment.</li> + * </ol> + * + * <h1>Shuffle Input/Output management.</h1> + * + * <h2>Result partition management.</h2> + * + * <p>The interface implements a factory of result partition writers to produce shuffle data: + * {@link ShuffleEnvironment#createResultPartitionWriters}. The created writers are grouped per owner. + * The owner is responsible for the writers' lifecycle from the moment of creation. + * + * <p>Partitions are released in the following cases: + * <ol> + * <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link ResultPartitionWriter#close()} are called + * if the production has failed. + * </li> + * <li>{@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called + * if the production is done. The actual release can take some time depending on implementation details, + * e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly + * or the partition is later released by {@link ShuffleEnvironment#releasePartitions(Collection)}.</li> + * <li>{@link ShuffleEnvironment#releasePartitions(Collection)} is called outside of the producer thread, + * e.g. to manage the lifecycle of BLOCKING result partitions which can outlive their producers.</li> + * </ol> + * The partitions, which currently still occupy local resources, can be queried with + * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}. + * + * <h2>Input gate management.</h2> + * + * <p>The interface implements a factory for the input gates: {@link ShuffleEnvironment#createInputGates}. + * The created gates are grouped per owner. The owner is responsible for the gates' lifecycle from the moment of creation. + * + * <p>When the input gates are created, it can happen that not all consumed partitions are known at that moment + * e.g. because their producers have not been started yet. Therefore, the {@link ShuffleEnvironment} provides + * a method {@link ShuffleEnvironment#updatePartitionInfo} to update them externally, when the producer becomes known. + * The update mechanism has to be threadsafe because the updated gate can be read concurrently from a different thread. + * + * @param <P> type of provided result partition writers + * @param <G> type of provided input gates + */ +public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends InputGate> extends AutoCloseable { + + /** + * Start the internal related services before using the shuffle service environment. + * + * @return a port to connect for the shuffle data exchange, -1 if only local connection is possible. + */ + int start() throws IOException; + + /** + * Factory method for the {@link ResultPartitionWriter ResultPartitionWriters} to produce result partitions. + * + * <p>The order of the {@link ResultPartitionWriter ResultPartitionWriters} in the returned collection + * should be the same as the iteration order of the passed {@code resultPartitionDeploymentDescriptors}. + * + * @param ownerName the owner name, used for logs + * @param executionAttemptID execution attempt id of the producer + * @param resultPartitionDeploymentDescriptors descriptors of the partition, produced by the owner + * @param outputGroup shuffle specific group for output metrics + * @param buffersGroup shuffle specific group for buffer metrics + * @return collection of the {@link ResultPartitionWriter ResultPartitionWriters} + */ + Collection<P> createResultPartitionWriters( + String ownerName, + ExecutionAttemptID executionAttemptID, + Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors, + MetricGroup outputGroup, + MetricGroup buffersGroup); + + /** + * Release local resources occupied by the given partitions. + * + * @param partitionIds identifying the partitions to be released + */ + void releasePartitions(Collection<ResultPartitionID> partitionIds); + + /** + * Report partitions which still occupy some resources locally. + * + * @return collection of partitions which still occupy some resources locally + * and have not been released yet. + */ + Collection<ResultPartitionID> getPartitionsOccupyingLocalResources(); + + /** + * Factory method for the {@link InputGate InputGates} to consume result partitions. + * + * <p>The order of the {@link InputGate InputGates} in the returned collection should be the same as the iteration order + * of the passed {@code inputGateDeploymentDescriptors}. + * + * @param ownerName the owner name, used for logs + * @param executionAttemptID execution attempt id of the consumer + * @param partitionProducerStateProvider producer state provider to query whether the producer is ready for consumption + * @param inputGateDeploymentDescriptors descriptors of the input gates to consume + * @param parentGroup parent of shuffle specific metric group + * @param inputGroup shuffle specific group for input metrics + * @param buffersGroup shuffle specific group for buffer metrics + * @return collection of the {@link InputGate InputGates} + */ + Collection<G> createInputGates( + String ownerName, + ExecutionAttemptID executionAttemptID, + PartitionProducerStateProvider partitionProducerStateProvider, + Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors, + MetricGroup parentGroup, + MetricGroup inputGroup, + MetricGroup buffersGroup); + + /** + * Update a gate with the newly available partition information, previously unknown. + * + * @param consumerID execution id to distinguish gates with the same id from the different consumer executions + * @param partitionInfo information needed to consume the updated partition, e.g. network location + * @return {@code true} if the partition has been updated or {@code false} if the partition is not available anymore. + * @throws IOException IO problem by the update + * @throws InterruptedException potentially blocking operation was interrupted + */ + boolean updatePartitionInfo( + ExecutionAttemptID consumerID, + PartitionInfo partitionInfo) throws IOException, InterruptedException; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java new file mode 100644 index 0000000..f7108b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironmentContext.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.TaskEventPublisher; + +import java.net.InetAddress; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Local context used to create {@link ShuffleEnvironment}. + */ +public class ShuffleEnvironmentContext { + private final Configuration configuration; + private final ResourceID location; + private final long maxJvmHeapMemory; + private final boolean localCommunicationOnly; + private final InetAddress hostAddress; + private final TaskEventPublisher eventPublisher; + private final MetricGroup parentMetricGroup; + private final IOManager ioManager; + + public ShuffleEnvironmentContext( + Configuration configuration, + ResourceID location, + long maxJvmHeapMemory, + boolean localCommunicationOnly, + InetAddress hostAddress, + TaskEventPublisher eventPublisher, + MetricGroup parentMetricGroup, + IOManager ioManager) { + this.configuration = checkNotNull(configuration); + this.location = checkNotNull(location); + this.maxJvmHeapMemory = maxJvmHeapMemory; + this.localCommunicationOnly = localCommunicationOnly; + this.hostAddress = checkNotNull(hostAddress); + this.eventPublisher = checkNotNull(eventPublisher); + this.parentMetricGroup = checkNotNull(parentMetricGroup); + this.ioManager = checkNotNull(ioManager); + } + + public Configuration getConfiguration() { + return configuration; + } + + public ResourceID getLocation() { + return location; + } + + public long getMaxJvmHeapMemory() { + return maxJvmHeapMemory; + } + + public boolean isLocalCommunicationOnly() { + return localCommunicationOnly; + } + + public InetAddress getHostAddress() { + return hostAddress; + } + + public TaskEventPublisher getEventPublisher() { + return eventPublisher; + } + + public MetricGroup getParentMetricGroup() { + return parentMetricGroup; + } + + public IOManager getIOManager() { + return ioManager; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e192824..a31521e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -48,7 +48,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -169,7 +169,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final TaskExecutorLocalStateStoresManager localStateStoresManager; /** The network component in the task manager. */ - private final NetworkEnvironment networkEnvironment; + private final ShuffleEnvironment shuffleEnvironment; /** The kvState registration service in the task manager. */ private final KvStateService kvStateService; @@ -238,7 +238,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.jobLeaderService = taskExecutorServices.getJobLeaderService(); this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation(); this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore(); - this.networkEnvironment = taskExecutorServices.getNetworkEnvironment(); + this.shuffleEnvironment = taskExecutorServices.getShuffleEnvironment(); this.kvStateService = taskExecutorServices.getKvStateService(); this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); @@ -270,8 +270,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public CompletableFuture<Boolean> canBeReleased() { - return CompletableFuture.completedFuture( - taskExecutorServices.getNetworkEnvironment().getUnreleasedPartitions().isEmpty()); + return CompletableFuture.completedFuture(shuffleEnvironment.getPartitionsOccupyingLocalResources().isEmpty()); } // ------------------------------------------------------------------------ @@ -533,7 +532,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { tdd.getTargetSlotNumber(), taskExecutorServices.getMemoryManager(), taskExecutorServices.getIOManager(), - taskExecutorServices.getNetworkEnvironment(), + taskExecutorServices.getShuffleEnvironment(), taskExecutorServices.getKvStateService(), taskExecutorServices.getBroadcastVariableManager(), taskExecutorServices.getTaskEventDispatcher(), @@ -615,7 +614,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { CompletableFuture.runAsync( () -> { try { - if (!networkEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) { + if (!shuffleEnvironment.updatePartitionInfo(executionAttemptID, partitionInfo)) { log.debug( "Discard update for input gate partition {} of result {} in task {}. " + "The partition is no longer available.", @@ -643,7 +642,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void releasePartitions(Collection<ResultPartitionID> partitionIds) { try { - networkEnvironment.releasePartitions(partitionIds); + shuffleEnvironment.releasePartitions(partitionIds); } catch (Throwable t) { // TODO: Do we still need this catch branch? onFatalError(t); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 57a7f2b..d5985b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -356,7 +356,11 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, - remoteAddress); + resourceID, + remoteAddress, + EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), + EnvironmentInformation.getMaxJvmHeapMemory(), + localCommunicationOnly); Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, @@ -365,14 +369,9 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( - configuration, taskManagerServicesConfiguration, taskManagerMetricGroup.f1, - resourceID, - rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. - EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), - EnvironmentInformation.getMaxJvmHeapMemory(), - localCommunicationOnly); + rpcService.getExecutor()); // TODO replace this later with some dedicated executor for io. TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 266a38c..a8e909a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -27,13 +27,14 @@ import org.apache.flink.core.memory.MemoryType; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -58,7 +59,7 @@ import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; /** * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, - * {@link NetworkEnvironment}. All services are exclusive to a single {@link TaskExecutor}. + * {@link ShuffleEnvironment}. All services are exclusive to a single {@link TaskExecutor}. * Consequently, the respective {@link TaskExecutor} is responsible for closing them. */ public class TaskManagerServices { @@ -71,7 +72,7 @@ public class TaskManagerServices { private final TaskManagerLocation taskManagerLocation; private final MemoryManager memoryManager; private final IOManager ioManager; - private final NetworkEnvironment networkEnvironment; + private final ShuffleEnvironment shuffleEnvironment; private final KvStateService kvStateService; private final BroadcastVariableManager broadcastVariableManager; private final TaskSlotTable taskSlotTable; @@ -84,7 +85,7 @@ public class TaskManagerServices { TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager ioManager, - NetworkEnvironment networkEnvironment, + ShuffleEnvironment shuffleEnvironment, KvStateService kvStateService, BroadcastVariableManager broadcastVariableManager, TaskSlotTable taskSlotTable, @@ -96,7 +97,7 @@ public class TaskManagerServices { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.memoryManager = Preconditions.checkNotNull(memoryManager); this.ioManager = Preconditions.checkNotNull(ioManager); - this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment); this.kvStateService = Preconditions.checkNotNull(kvStateService); this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); @@ -118,8 +119,8 @@ public class TaskManagerServices { return ioManager; } - public NetworkEnvironment getNetworkEnvironment() { - return networkEnvironment; + public ShuffleEnvironment getShuffleEnvironment() { + return shuffleEnvironment; } public KvStateService getKvStateService() { @@ -184,7 +185,7 @@ public class TaskManagerServices { } try { - networkEnvironment.shutdown(); + shuffleEnvironment.close(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } @@ -221,27 +222,16 @@ public class TaskManagerServices { /** * Creates and returns the task manager services. * - * @param configuration Flink configuration. * @param taskManagerServicesConfiguration task manager configuration * @param taskManagerMetricGroup metric group of the task manager - * @param resourceID resource ID of the task manager * @param taskIOExecutor executor for async IO operations - * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory - * @param maxJvmHeapMemory the maximum JVM heap size - * @param localCommunicationOnly True, to skip initializing the network stack. - * Use only in cases where only one task manager runs. * @return task manager components * @throws Exception */ public static TaskManagerServices fromConfiguration( - Configuration configuration, TaskManagerServicesConfiguration taskManagerServicesConfiguration, MetricGroup taskManagerMetricGroup, - ResourceID resourceID, - Executor taskIOExecutor, - long freeHeapMemoryWithDefrag, - long maxJvmHeapMemory, - boolean localCommunicationOnly) throws Exception { + Executor taskIOExecutor) throws Exception { // pre-start checks checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); @@ -251,26 +241,23 @@ public class TaskManagerServices { // start the I/O manager, it will create some temp directories. final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); - final NetworkEnvironment network = NetworkEnvironment.fromConfiguration( - configuration, + final ShuffleEnvironment shuffleEnvironment = createShuffleEnvironment( + taskManagerServicesConfiguration, taskEventDispatcher, taskManagerMetricGroup, - ioManager, - maxJvmHeapMemory, - localCommunicationOnly, - taskManagerServicesConfiguration.getTaskManagerAddress()); - int dataPort = network.start(); + ioManager); + final int dataPort = shuffleEnvironment.start(); final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); kvStateService.start(); final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( - resourceID, + taskManagerServicesConfiguration.getResourceID(), taskManagerServicesConfiguration.getTaskManagerAddress(), dataPort); // this call has to happen strictly after the network stack has been initialized - final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory); + final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration); final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager(); @@ -307,7 +294,7 @@ public class TaskManagerServices { taskManagerLocation, memoryManager, ioManager, - network, + shuffleEnvironment, kvStateService, broadcastVariableManager, taskSlotTable, @@ -317,19 +304,34 @@ public class TaskManagerServices { taskEventDispatcher); } + private static ShuffleEnvironment createShuffleEnvironment( + TaskManagerServicesConfiguration taskManagerServicesConfiguration, + TaskEventDispatcher taskEventDispatcher, + MetricGroup taskManagerMetricGroup, + IOManager ioManager) { + + final ShuffleEnvironmentContext shuffleEnvironmentContext = new ShuffleEnvironmentContext( + taskManagerServicesConfiguration.getConfiguration(), + taskManagerServicesConfiguration.getResourceID(), + taskManagerServicesConfiguration.getMaxJvmHeapMemory(), + taskManagerServicesConfiguration.isLocalCommunicationOnly(), + taskManagerServicesConfiguration.getTaskManagerAddress(), + taskEventDispatcher, + taskManagerMetricGroup, + ioManager); + + return NetworkEnvironment.fromShuffleContext(shuffleEnvironmentContext); + } + /** * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}. * * @param taskManagerServicesConfiguration to create the memory manager from - * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory - * @param maxJvmHeapMemory the maximum JVM heap size * @return Memory manager * @throws Exception */ private static MemoryManager createMemoryManager( - TaskManagerServicesConfiguration taskManagerServicesConfiguration, - long freeHeapMemoryWithDefrag, - long maxJvmHeapMemory) throws Exception { + TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception { // computing the amount of memory to use depends on how much memory is available // it strictly needs to happen AFTER the network stack has been initialized @@ -354,6 +356,7 @@ public class TaskManagerServices { float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction(); if (memType == MemoryType.HEAP) { + long freeHeapMemoryWithDefrag = taskManagerServicesConfiguration.getFreeHeapMemoryWithDefrag(); // network buffers allocated off-heap -> use memoryFraction of the available heap: long relativeMemSize = (long) (freeHeapMemoryWithDefrag * memoryFraction); if (preAllocateMemory) { @@ -365,6 +368,7 @@ public class TaskManagerServices { } memorySize = relativeMemSize; } else if (memType == MemoryType.OFF_HEAP) { + long maxJvmHeapMemory = taskManagerServicesConfiguration.getMaxJvmHeapMemory(); // The maximum heap memory has been adjusted according to the fraction (see // calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e. // maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index f88444e..64f5a30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.runtime.util.ConfigurationParserUtils; @@ -42,8 +43,14 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class TaskManagerServicesConfiguration { + private final Configuration configuration; + + private final ResourceID resourceID; + private final InetAddress taskManagerAddress; + private final boolean localCommunicationOnly; + private final String[] tmpDirPaths; private final String[] localRecoveryStateRootDirectories; @@ -53,6 +60,10 @@ public class TaskManagerServicesConfiguration { @Nullable private final QueryableStateConfiguration queryableStateConfig; + private final long freeHeapMemoryWithDefrag; + + private final long maxJvmHeapMemory; + /** * Managed memory (in megabytes). * @@ -77,9 +88,14 @@ public class TaskManagerServicesConfiguration { private Optional<Time> systemResourceMetricsProbingInterval; public TaskManagerServicesConfiguration( + Configuration configuration, + ResourceID resourceID, InetAddress taskManagerAddress, + boolean localCommunicationOnly, String[] tmpDirPaths, String[] localRecoveryStateRootDirectories, + long freeHeapMemoryWithDefrag, + long maxJvmHeapMemory, boolean localRecoveryEnabled, @Nullable QueryableStateConfiguration queryableStateConfig, int numberOfSlots, @@ -91,10 +107,15 @@ public class TaskManagerServicesConfiguration { long timerServiceShutdownTimeout, RetryingRegistrationConfiguration retryingRegistrationConfiguration, Optional<Time> systemResourceMetricsProbingInterval) { + this.configuration = checkNotNull(configuration); + this.resourceID = checkNotNull(resourceID); this.taskManagerAddress = checkNotNull(taskManagerAddress); + this.localCommunicationOnly = localCommunicationOnly; this.tmpDirPaths = checkNotNull(tmpDirPaths); this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories); + this.freeHeapMemoryWithDefrag = freeHeapMemoryWithDefrag; + this.maxJvmHeapMemory = maxJvmHeapMemory; this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled); this.queryableStateConfig = queryableStateConfig; this.numberOfSlots = checkNotNull(numberOfSlots); @@ -117,10 +138,22 @@ public class TaskManagerServicesConfiguration { // Getter/Setter // -------------------------------------------------------------------------------------------- + public Configuration getConfiguration() { + return configuration; + } + + public ResourceID getResourceID() { + return resourceID; + } + InetAddress getTaskManagerAddress() { return taskManagerAddress; } + boolean isLocalCommunicationOnly() { + return localCommunicationOnly; + } + public String[] getTmpDirPaths() { return tmpDirPaths; } @@ -155,6 +188,14 @@ public class TaskManagerServicesConfiguration { return memoryType; } + long getFreeHeapMemoryWithDefrag() { + return freeHeapMemoryWithDefrag; + } + + long getMaxJvmHeapMemory() { + return maxJvmHeapMemory; + } + /** * Returns the size of the managed memory (in megabytes), if configured. * @@ -195,13 +236,22 @@ public class TaskManagerServicesConfiguration { * sanity check them. * * @param configuration The configuration. + * @param resourceID resource ID of the task manager * @param remoteAddress identifying the IP address under which the TaskManager will be accessible + * @param freeHeapMemoryWithDefrag an estimate of the size of the free heap memory + * @param maxJvmHeapMemory the maximum JVM heap size + * @param localCommunicationOnly True if only local communication is possible. + * Use only in cases where only one task manager runs. * - * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, etc. + * @return configuration of task manager services used to create them */ public static TaskManagerServicesConfiguration fromConfiguration( Configuration configuration, - InetAddress remoteAddress) { + ResourceID resourceID, + InetAddress remoteAddress, + long freeHeapMemoryWithDefrag, + long maxJvmHeapMemory, + boolean localCommunicationOnly) { final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration); String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration); if (localStateRootDir.length == 0) { @@ -220,9 +270,14 @@ public class TaskManagerServicesConfiguration { final RetryingRegistrationConfiguration retryingRegistrationConfiguration = RetryingRegistrationConfiguration.fromConfiguration(configuration); return new TaskManagerServicesConfiguration( + configuration, + resourceID, remoteAddress, + localCommunicationOnly, tmpDirs, localStateRootDir, + freeHeapMemoryWithDefrag, + maxJvmHeapMemory, localRecoveryMode, queryableStateConfig, ConfigurationParserUtils.getSlot(configuration), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 27d9cdc..5512c54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; @@ -63,6 +62,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; @@ -283,7 +283,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid int targetSlotNumber, MemoryManager memManager, IOManager ioManager, - NetworkEnvironment networkEnvironment, + ShuffleEnvironment<?, ?> shuffleEnvironment, KvStateService kvStateService, BroadcastVariableManager bcVarManager, TaskEventDispatcher taskEventDispatcher, @@ -368,12 +368,12 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid final MetricGroup inputGroup = networkGroup.addGroup("Input"); // produced intermediate result partitions - final ResultPartitionWriter[] resultPartitionWriters = networkEnvironment.createResultPartitionWriters( + final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment.createResultPartitionWriters( taskNameWithSubtaskAndId, executionId, resultPartitionDeploymentDescriptors, outputGroup, - buffersGroup); + buffersGroup).toArray(new ResultPartitionWriter[] {}); this.consumableNotifyingPartitionWriters = ConsumableNotifyingResultPartitionWriterDecorator.decorate( resultPartitionDeploymentDescriptors, @@ -383,14 +383,14 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid resultPartitionConsumableNotifier); // consumed intermediate result partitions - InputGate[] gates = networkEnvironment.createInputGates( + final InputGate[] gates = shuffleEnvironment.createInputGates( taskNameWithSubtaskAndId, executionId, this, inputGateDeploymentDescriptors, metrics.getIOMetricGroup(), inputGroup, - buffersGroup); + buffersGroup).toArray(new InputGate[] {}); this.inputGates = new InputGate[gates.length]; int counter = 0; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java index aad87494..948d4c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java @@ -146,11 +146,15 @@ public class ConfigurationParserUtils { configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes()); // check page size of for minimum size - checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + checkConfigParameter( + pageSize >= MemoryManager.MIN_PAGE_SIZE, + pageSize, TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); // check page size for power of two - checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + checkConfigParameter( + MathUtils.isPowerOf2(pageSize), + pageSize, TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(), "Memory segment size must be a power of 2."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java index 1950638..03ec3a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java @@ -119,7 +119,6 @@ public class NetworkEnvironmentBuilder { public NetworkEnvironment build() { return NetworkEnvironment.create( - taskManagerLocation, new NetworkEnvironmentConfiguration( numNetworkBuffers, networkBufferSize, @@ -130,6 +129,7 @@ public class NetworkEnvironmentBuilder { isCreditBased, isNetworkDetailedMetrics, nettyConfig), + taskManagerLocation, taskEventDispatcher, metricGroup, ioManager); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 933f385..4606318 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -124,7 +124,7 @@ public class NetworkEnvironmentTest { for (SingleInputGate ig : inputGates) { ig.close(); } - network.shutdown(); + network.close(); } /** @@ -245,7 +245,7 @@ public class NetworkEnvironmentTest { for (SingleInputGate ig : inputGates) { ig.close(); } - network.shutdown(); + network.close(); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 36a98ff..de5eaf1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -258,7 +258,7 @@ public class ResultPartitionTest { } } finally { resultPartition.release(); - network.shutdown(); + network.close(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 11c6a51..03fd3b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -401,7 +401,7 @@ public class SingleInputGateTest extends InputGateTestBase { } } finally { gate.close(); - netEnv.shutdown(); + netEnv.close(); } } @@ -436,7 +436,7 @@ public class SingleInputGateTest extends InputGateTestBase { } } finally { inputGate.close(); - network.shutdown(); + network.close(); } } @@ -487,7 +487,7 @@ public class SingleInputGateTest extends InputGateTestBase { } } finally { inputGate.close(); - network.shutdown(); + network.close(); } } @@ -537,7 +537,7 @@ public class SingleInputGateTest extends InputGateTestBase { is(instanceOf((LocalInputChannel.class)))); } finally { inputGate.close(); - network.shutdown(); + network.close(); } } @@ -580,7 +580,7 @@ public class SingleInputGateTest extends InputGateTestBase { assertThat(network.getInputGate(id).isPresent(), is(false)); } } finally { - network.shutdown(); + network.close(); } } @@ -614,7 +614,7 @@ public class SingleInputGateTest extends InputGateTestBase { Arrays.asList(gateDescs), new UnregisteredMetricsGroup(), new UnregisteredMetricsGroup(), - new UnregisteredMetricsGroup()); + new UnregisteredMetricsGroup()).toArray(new SingleInputGate[] {}); Map<InputGateID, SingleInputGate> inputGatesById = new HashMap<>(); for (int i = 0; i < numberOfGates; i++) { inputGatesById.put(new InputGateID(ids[i], consumerID), gates[i]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java index e80287f..d9eac20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -66,7 +66,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { // test configuration of the local state mode config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true); - TaskManagerServices taskManagerServices = createTaskManagerServices(config, createTaskManagerServiceConfiguration(config)); + TaskManagerServices taskManagerServices = createTaskManagerServices(createTaskManagerServiceConfiguration(config)); TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore(); @@ -99,7 +99,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { TaskManagerServicesConfiguration taskManagerServicesConfiguration = createTaskManagerServiceConfiguration(config); - TaskManagerServices taskManagerServices = createTaskManagerServices(config, taskManagerServicesConfiguration); + TaskManagerServices taskManagerServices = createTaskManagerServices(taskManagerServicesConfiguration); TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore(); @@ -206,20 +206,18 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { Configuration config) throws IOException { return TaskManagerServicesConfiguration.fromConfiguration( config, - InetAddress.getLocalHost()); + ResourceID.generate(), + InetAddress.getLocalHost(), + MEM_SIZE_PARAM, + MEM_SIZE_PARAM, + true); } private TaskManagerServices createTaskManagerServices( - Configuration flinkConfig, TaskManagerServicesConfiguration config) throws Exception { return TaskManagerServices.fromConfiguration( - flinkConfig, config, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), - ResourceID.generate(), - Executors.directExecutor(), - MEM_SIZE_PARAM, - MEM_SIZE_PARAM, - true); + Executors.directExecutor()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java index 8a21da9..9673e13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java @@ -37,7 +37,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphException; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -276,7 +276,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { .addTaskManagerActionListener(eid2, ExecutionState.FINISHED, task2FinishedFuture) .setJobMasterId(jobMasterId) .setJobMasterGateway(testingJobMasterGateway) - .useRealNonMockNetworkEnvironment() + .useRealNonMockShuffleEnvironment() .build()) { TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); TaskSlotTable taskSlotTable = env.getTaskSlotTable(); @@ -342,7 +342,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { .addTaskManagerActionListener(eid2, ExecutionState.CANCELED, task2CanceledFuture) .setJobMasterId(jobMasterId) .setJobMasterGateway(testingJobMasterGateway) - .useRealNonMockNetworkEnvironment() + .useRealNonMockShuffleEnvironment() .build()) { TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); TaskSlotTable taskSlotTable = env.getTaskSlotTable(); @@ -392,7 +392,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { .addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture) .setConfiguration(config) .setLocalCommunication(false) - .useRealNonMockNetworkEnvironment() + .useRealNonMockShuffleEnvironment() .build()) { TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); TaskSlotTable taskSlotTable = env.getTaskSlotTable(); @@ -417,11 +417,11 @@ public class TaskExecutorSubmissionTest extends TestLogger { final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>(); final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>(); - final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); + final ShuffleEnvironment<?, ?> shuffleEnvironment = mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS); try (TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId) - .setNetworkEnvironment(networkEnvironment) + .setShuffleEnvironment(shuffleEnvironment) .setSlotSize(1) .addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture) .addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture) @@ -437,7 +437,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { NettyShuffleDescriptor shuffleDescriptor = createRemoteWithIdAndLocation(new IntermediateResultPartitionID(), producerLocation); final PartitionInfo partitionUpdate = new PartitionInfo(new IntermediateDataSetID(), shuffleDescriptor); - doThrow(new IOException()).when(networkEnvironment).updatePartitionInfo(eid, partitionUpdate); + doThrow(new IOException()).when(shuffleEnvironment).updatePartitionInfo(eid, partitionUpdate); final CompletableFuture<Acknowledge> updateFuture = tmGateway.updatePartitions( eid, @@ -477,7 +477,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { .addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture) .addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture) .setConfiguration(config) - .useRealNonMockNetworkEnvironment() + .useRealNonMockShuffleEnvironment() .build()) { TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); TaskSlotTable taskSlotTable = env.getTaskSlotTable(); @@ -536,7 +536,7 @@ public class TaskExecutorSubmissionTest extends TestLogger { .addTaskManagerActionListener(eid, ExecutionState.RUNNING, taskRunningFuture) .setJobMasterId(jobMasterId) .setJobMasterGateway(testingJobMasterGateway) - .useRealNonMockNetworkEnvironment() + .useRealNonMockShuffleEnvironment() .build()) { TaskExecutorGateway tmGateway = env.getTaskExecutorGateway(); TaskSlotTable taskSlotTable = env.getTaskSlotTable(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 4d0b4d5..f1d607a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -244,7 +244,7 @@ public class TaskExecutorTest extends TestLogger { } if (networkEnvironment != null) { - networkEnvironment.shutdown(); + networkEnvironment.close(); } testingFatalErrorHandler.rethrowError(); @@ -279,7 +279,7 @@ public class TaskExecutorTest extends TestLogger { .setTaskManagerLocation(taskManagerLocation) .setMemoryManager(memoryManager) .setIoManager(ioManager) - .setNetworkEnvironment(networkEnvironment) + .setShuffleEnvironment(networkEnvironment) .setKvStateService(kvStateService) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) @@ -295,7 +295,7 @@ public class TaskExecutorTest extends TestLogger { } assertThat(memoryManager.isShutdown(), is(true)); - assertThat(networkEnvironment.isShutdown(), is(true)); + assertThat(networkEnvironment.isClosed(), is(true)); assertThat(ioManager.isProperlyShutDown(), is(true)); assertThat(kvStateService.isShutdown(), is(true)); } @@ -709,7 +709,7 @@ public class TaskExecutorTest extends TestLogger { final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() - .setNetworkEnvironment(networkEnvironment) + .setShuffleEnvironment(networkEnvironment) .setTaskSlotTable(taskSlotTable) .setJobManagerTable(jobManagerTable) .setTaskStateManager(localStateStoresManager) @@ -967,7 +967,7 @@ public class TaskExecutorTest extends TestLogger { final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) - .setNetworkEnvironment(networkEnvironment) + .setShuffleEnvironment(networkEnvironment) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) .setJobManagerTable(jobManagerTable) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java index 0238edf..c763db6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.core.memory.MemoryType; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.query.KvStateRegistry; @@ -42,7 +42,7 @@ public class TaskManagerServicesBuilder { private TaskManagerLocation taskManagerLocation; private MemoryManager memoryManager; private IOManager ioManager; - private NetworkEnvironment networkEnvironment; + private ShuffleEnvironment<?, ?> shuffleEnvironment; private KvStateService kvStateService; private BroadcastVariableManager broadcastVariableManager; private TaskSlotTable taskSlotTable; @@ -60,7 +60,7 @@ public class TaskManagerServicesBuilder { MemoryType.HEAP, false); ioManager = mock(IOManager.class); - networkEnvironment = mock(NetworkEnvironment.class); + shuffleEnvironment = mock(ShuffleEnvironment.class); kvStateService = new KvStateService(new KvStateRegistry(), null, null); broadcastVariableManager = new BroadcastVariableManager(); taskEventDispatcher = new TaskEventDispatcher(); @@ -85,8 +85,8 @@ public class TaskManagerServicesBuilder { return this; } - public TaskManagerServicesBuilder setNetworkEnvironment(NetworkEnvironment networkEnvironment) { - this.networkEnvironment = networkEnvironment; + public TaskManagerServicesBuilder setShuffleEnvironment(ShuffleEnvironment<?, ?> shuffleEnvironment) { + this.shuffleEnvironment = shuffleEnvironment; return this; } @@ -125,7 +125,7 @@ public class TaskManagerServicesBuilder { taskManagerLocation, memoryManager, ioManager, - networkEnvironment, + shuffleEnvironment, kvStateService, broadcastVariableManager, taskSlotTable, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 6b185c6..958696f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -34,8 +34,8 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; @@ -104,7 +104,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { Configuration configuration, List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> taskManagerActionListeners, TestingRpcService testingRpcService, - NetworkEnvironment networkEnvironment) throws Exception { + ShuffleEnvironment<?, ?> shuffleEnvironment) throws Exception { this.haServices = new TestingHighAvailabilityServices(); this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); @@ -154,7 +154,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { Executors.directExecutor()); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() - .setNetworkEnvironment(networkEnvironment) + .setShuffleEnvironment(shuffleEnvironment) .setTaskSlotTable(taskSlotTable) .setJobManagerTable(jobManagerTable) .setTaskStateManager(localStateStoresManager) @@ -229,15 +229,15 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { partitionProducerStateChecker); } - private static NetworkEnvironment createNetworkEnvironment( + private static ShuffleEnvironment<?, ?> createShuffleEnvironment( ResourceID taskManagerLocation, boolean localCommunication, Configuration configuration, RpcService testingRpcService, - boolean mockNetworkEnvironment) throws Exception { - final NetworkEnvironment networkEnvironment; - if (mockNetworkEnvironment) { - networkEnvironment = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); + boolean mockShuffleEnvironment) throws Exception { + final ShuffleEnvironment<?, ?> shuffleEnvironment; + if (mockShuffleEnvironment) { + shuffleEnvironment = mock(ShuffleEnvironment.class, Mockito.RETURNS_MOCKS); } else { final InetSocketAddress socketAddress = new InetSocketAddress( InetAddress.getByName(testingRpcService.getAddress()), configuration.getInteger(NetworkEnvironmentOptions.DATA_PORT)); @@ -245,17 +245,17 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { final NettyConfig nettyConfig = new NettyConfig(socketAddress.getAddress(), socketAddress.getPort(), ConfigurationParserUtils.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration); - networkEnvironment = new NetworkEnvironmentBuilder() + shuffleEnvironment = new NetworkEnvironmentBuilder() .setTaskManagerLocation(taskManagerLocation) .setPartitionRequestInitialBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL)) .setPartitionRequestMaxBackoff(configuration.getInteger(NetworkEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX)) .setNettyConfig(localCommunication ? null : nettyConfig) .build(); - networkEnvironment.start(); + shuffleEnvironment.start(); } - return networkEnvironment; + return shuffleEnvironment; } @Override @@ -274,14 +274,14 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { public static final class Builder { private JobID jobId; - private boolean mockNetworkEnvironment = true; + private boolean mockShuffleEnvironment = true; private int slotSize; private JobMasterId jobMasterId = JobMasterId.generate(); private TestingJobMasterGateway jobMasterGateway; private boolean localCommunication = true; private Configuration configuration = new Configuration(); @SuppressWarnings("OptionalUsedAsFieldOrParameterType") - private Optional<NetworkEnvironment> optionalNetworkEnvironment = Optional.empty(); + private Optional<ShuffleEnvironment<?, ?>> optionalShuffleEnvironment = Optional.empty(); private ResourceID resourceID = ResourceID.generate(); private List<Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>>> taskManagerActionListeners = new ArrayList<>(); @@ -290,15 +290,15 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { this.jobId = jobId; } - public Builder useRealNonMockNetworkEnvironment() { - this.optionalNetworkEnvironment = Optional.empty(); - this.mockNetworkEnvironment = false; + public Builder useRealNonMockShuffleEnvironment() { + this.optionalShuffleEnvironment = Optional.empty(); + this.mockShuffleEnvironment = false; return this; } - public Builder setNetworkEnvironment(NetworkEnvironment optionalNetworkEnvironment) { - this.mockNetworkEnvironment = false; - this.optionalNetworkEnvironment = Optional.of(optionalNetworkEnvironment); + public Builder setShuffleEnvironment(ShuffleEnvironment<?, ?> optionalShuffleEnvironment) { + this.mockShuffleEnvironment = false; + this.optionalShuffleEnvironment = Optional.of(optionalShuffleEnvironment); return this; } @@ -339,14 +339,13 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { public TaskSubmissionTestEnvironment build() throws Exception { final TestingRpcService testingRpcService = new TestingRpcService(); - final NetworkEnvironment network = optionalNetworkEnvironment.orElseGet(() -> { + final ShuffleEnvironment<?, ?> network = optionalShuffleEnvironment.orElseGet(() -> { try { - return createNetworkEnvironment( - resourceID, + return createShuffleEnvironment(resourceID, localCommunication, configuration, testingRpcService, - mockNetworkEnvironment); + mockShuffleEnvironment); } catch (Exception e) { throw new FlinkRuntimeException("Failed to build TaskSubmissionTestEnvironment", e); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 01cd184..bfcbd13 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -40,8 +40,8 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -106,7 +106,7 @@ public class TaskAsyncCallTest extends TestLogger { private static final List<ClassLoader> classLoaders = Collections.synchronizedList(new ArrayList<>()); - private NetworkEnvironment networkEnvironment; + private ShuffleEnvironment shuffleEnvironment; @Before public void createQueuesAndActors() { @@ -117,15 +117,15 @@ public class TaskAsyncCallTest extends TestLogger { notifyCheckpointCompleteLatch = new OneShotLatch(); stopLatch = new OneShotLatch(); - networkEnvironment = new NetworkEnvironmentBuilder().build(); + shuffleEnvironment = new NetworkEnvironmentBuilder().build(); classLoaders.clear(); } @After - public void teardown() { - if (networkEnvironment != null) { - networkEnvironment.shutdown(); + public void teardown() throws Exception { + if (shuffleEnvironment != null) { + shuffleEnvironment.close(); } } @@ -255,7 +255,7 @@ public class TaskAsyncCallTest extends TestLogger { 0, mock(MemoryManager.class), mock(IOManager.class), - networkEnvironment, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index f23fc9d..a1b73df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; @@ -47,8 +46,8 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; @@ -125,7 +124,7 @@ public class TaskTest extends TestLogger { private static OneShotLatch awaitLatch; private static OneShotLatch triggerLatch; - private NetworkEnvironment networkEnvironment; + private ShuffleEnvironment<?, ?> shuffleEnvironment; @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -135,13 +134,13 @@ public class TaskTest extends TestLogger { awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); - networkEnvironment = new NetworkEnvironmentBuilder().build(); + shuffleEnvironment = new NetworkEnvironmentBuilder().build(); } @After - public void teardown() { - if (networkEnvironment != null) { - networkEnvironment.shutdown(); + public void teardown() throws Exception { + if (shuffleEnvironment != null) { + shuffleEnvironment.close(); } } @@ -308,7 +307,7 @@ public class TaskTest extends TestLogger { final PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); final QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions(); - final Task task = new TaskBuilder(networkEnvironment) + final Task task = new TaskBuilder(shuffleEnvironment) .setTaskManagerActions(taskManagerActions) .setConsumableNotifier(consumableNotifier) .setPartitionProducerStateChecker(partitionProducerStateChecker) @@ -317,7 +316,7 @@ public class TaskTest extends TestLogger { .build(); // shut down the network to make the following task registration failure - networkEnvironment.shutdown(); + shuffleEnvironment.close(); // should fail task.run(); @@ -974,7 +973,7 @@ public class TaskTest extends TestLogger { } private TaskBuilder createTaskBuilder() { - return new TaskBuilder(networkEnvironment); + return new TaskBuilder(shuffleEnvironment); } private static final class TaskBuilder { @@ -983,7 +982,7 @@ public class TaskTest extends TestLogger { private LibraryCacheManager libraryCacheManager; private ResultPartitionConsumableNotifier consumableNotifier; private PartitionProducerStateChecker partitionProducerStateChecker; - private final NetworkEnvironment networkEnvironment; + private final ShuffleEnvironment<?, ?> shuffleEnvironment; private KvStateService kvStateService; private Executor executor; private Configuration taskManagerConfig; @@ -1012,8 +1011,8 @@ public class TaskTest extends TestLogger { requiredJarFileBlobKeys = Collections.emptyList(); } - private TaskBuilder(NetworkEnvironment networkEnvironment) { - this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + private TaskBuilder(ShuffleEnvironment<?, ?> shuffleEnvironment) { + this.shuffleEnvironment = Preconditions.checkNotNull(shuffleEnvironment); } TaskBuilder setInvokable(Class<? extends AbstractInvokable> invokable) { @@ -1117,7 +1116,7 @@ public class TaskTest extends TestLogger { 0, mock(MemoryManager.class), mock(IOManager.class), - networkEnvironment, + shuffleEnvironment, kvStateService, mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index 51930a0..99db686 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -43,8 +43,8 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -168,7 +168,7 @@ public class JvmExitOnFatalErrorTest { final MemoryManager memoryManager = new MemoryManager(1024 * 1024, 1); final IOManager ioManager = new IOManagerAsync(); - final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build(); + final ShuffleEnvironment shuffleEnvironment = new NetworkEnvironmentBuilder().build(); final TaskManagerRuntimeInfo tmInfo = TaskManagerConfiguration.fromConfiguration(taskManagerConfig); @@ -206,7 +206,7 @@ public class JvmExitOnFatalErrorTest { 0, // targetSlotNumber memoryManager, ioManager, - networkEnvironment, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), new BroadcastVariableManager(), new TaskEventDispatcher(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index 8a0ad2f..8f50198 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -166,8 +166,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { } public void tearDown() { - suppressExceptions(senderEnv::shutdown); - suppressExceptions(receiverEnv::shutdown); + suppressExceptions(senderEnv::close); + suppressExceptions(receiverEnv::close); suppressExceptions(ioManager::shutdown); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index ce5a7e1..e0f3b52 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -53,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -181,7 +181,7 @@ public class InterruptSensitiveRestoreTest { StreamStateHandle state, int mode) throws IOException { - NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment shuffleEnvironment = new NetworkEnvironmentBuilder().build(); Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList(); Collection<KeyedStateHandle> keyedStateFromStream = Collections.emptyList(); @@ -270,7 +270,7 @@ public class InterruptSensitiveRestoreTest { 0, mock(MemoryManager.class), mock(IOManager.class), - networkEnvironment, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 4cffdbd..64b55fc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -54,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -150,7 +150,7 @@ public class StreamTaskTerminationTest extends TestLogger { final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); - final NetworkEnvironment networkEnv = new NetworkEnvironmentBuilder().build(); + final ShuffleEnvironment shuffleEnvironment = new NetworkEnvironmentBuilder().build(); BlobCacheService blobService = new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class)); @@ -167,7 +167,7 @@ public class StreamTaskTerminationTest extends TestLogger { 0, new MemoryManager(32L * 1024L, 1), new IOManagerAsync(), - networkEnv, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 3439574..c4d5965 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -65,6 +64,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; @@ -902,7 +902,7 @@ public class StreamTaskTest extends TestLogger { PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); - NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment shuffleEnvironment = new NetworkEnvironmentBuilder().build(); JobInformation jobInformation = new JobInformation( new JobID(), @@ -932,7 +932,7 @@ public class StreamTaskTest extends TestLogger { 0, mock(MemoryManager.class), mock(IOManager.class), - network, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index 0416e5c..5dbd5ce 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -54,6 +53,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskexecutor.KvStateService; @@ -263,7 +263,7 @@ public class SynchronousCheckpointITCase { ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier(); PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class); Executor executor = mock(Executor.class); - NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment shuffleEnvironment = new NetworkEnvironmentBuilder().build(); TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(); @@ -295,7 +295,7 @@ public class SynchronousCheckpointITCase { 0, mock(MemoryManager.class), mock(IOManager.class), - networkEnvironment, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index a6268ce..9f91a00 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; @@ -55,6 +54,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.AbstractSnapshotStrategy; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -224,7 +224,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { TestStreamTask.class.getName(), taskConfig); - NetworkEnvironment network = new NetworkEnvironmentBuilder().build(); + ShuffleEnvironment<?, ?> shuffleEnvironment = new NetworkEnvironmentBuilder().build(); BlobCacheService blobService = new BlobCacheService(mock(PermanentBlobCache.class), mock(TransientBlobCache.class)); @@ -241,7 +241,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { 0, mock(MemoryManager.class), mock(IOManager.class), - network, + shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TaskEventDispatcher(),