tillrohrmann commented on a change in pull request #8608: 
[FLINK-11392][network] Introduce ShuffleEnvironment interface
URL: https://github.com/apache/flink/pull/8608#discussion_r291566436
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
 ##########
 @@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Interface for the implementation of shuffle service local environment.
+ *
+ * <p>Input/Output interface of local shuffle service environment is based on 
memory {@link Buffer Buffers}.
+ * A producer can write shuffle data into the buffers,
+ * obtained from the created here {@link ResultPartitionWriter 
ResultPartitionWriters}
+ * and a consumer read the buffers from the created here {@link InputGate 
InputGates}.
+ *
+ * <h2>Lifecycle management.</h2>
+ *
+ * <p>The interface contains method's to manage the lifecycle of the local 
shuffle service environment:
+ * <ol>
+ *     <li>{@link ShuffleEnvironment#start} must be called before using the 
shuffle service environment.</li>
+ *     <li>{@link ShuffleEnvironment#close} is called to release the shuffle 
service environment.</li>
+ * </ol>
+ *
+ * <h2>Shuffle Input/Output management.</h2>
+ *
+ * <h3>Result partition management.</h3>
+ *
+ * <p>The interface implements a factory of result partition writers to 
produce shuffle data: {@link ShuffleEnvironment#createResultPartitionWriters}.
+ * The created writers are grouped per owner. The owner is responsible for the 
writers' lifecycle from the moment of creation.
+ *
+ * <p>Partitions are released in the following cases:
+ * <ol>
+ *     <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
+ *     if the production has failed.</li>
+ *     <li>{@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called
+ *     if the production is done. The actual release can take some time 
depending on implementation details, e.g.
+ *     if the `end of consumption' confirmation from the consumer is being 
awaited implicitly
+ *     or the partition is later released by {@link 
ShuffleEnvironment#releasePartitions(Collection)}.</li>
+ *     <li>{@link ShuffleEnvironment#releasePartitions(Collection)} is called 
outside of the producer thread,
+ *     e.g. to manage the lifecycle of BLOCKING result partitions which can 
outlive their producers.</li>
+ * </ol>
+ * The partitions, which currently still occupy local resources, can be 
queried with {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
+ *
+ * <h3>Input gate management.</h3>
+ *
+ * <p>The interface implements a factory for the input gates: {@link 
ShuffleEnvironment#createInputGates}.
+ * The created gates are grouped per owner. The owner is responsible for the 
gates' lifecycle from the moment of creation.
+ *
+ * <p>When the input gates are created, it can happen that not all consumed 
partitions
+ * are known at that moment e.g. because their producers have not been started 
yet.
+ * Therefore, the {@link ShuffleEnvironment} provides a method {@link 
ShuffleEnvironment#updatePartitionInfo} to update them
+ * externally, when the producer becomes known. The update mechanism has to be 
threadsafe because
+ * the updated gate can be read concurrently from a different thread.
+ */
+public interface ShuffleEnvironment extends AutoCloseable {
+
+       /**
+        * Start the internal related services before using the shuffle service 
environment.
+        *
+        * @return a port to connect for the shuffle data exchange, -1 if only 
local connection is possible.
+        */
+       int start() throws IOException;
+
+       /**
+        * Factory method for the {@link ResultPartitionWriter 
ResultPartitionWriters} to produce result partitions.
+        *
+        * <p>The order of the {@link ResultPartitionWriter 
ResultPartitionWriters} in the returned array
+        * should be the same as the iteration order of the passed {@code 
resultPartitionDeploymentDescriptors}.
+        *
+        * @param ownerName the owner name, used for logs
+        * @param executionAttemptID execution attempt id of the producer
+        * @param resultPartitionDeploymentDescriptors descriptors of the 
partition, produced by the owner
+        * @param outputGroup shuffle specific group for output metrics
+        * @param buffersGroup shuffle specific group for buffer metrics
+        * @return array of the {@link ResultPartitionWriter 
ResultPartitionWriters}
+        */
+       ResultPartitionWriter[] createResultPartitionWriters(
+               String ownerName,
+               ExecutionAttemptID executionAttemptID,
+               Collection<ResultPartitionDeploymentDescriptor> 
resultPartitionDeploymentDescriptors,
+               MetricGroup outputGroup,
+               MetricGroup buffersGroup);
+
+       /**
+        * Release local resources occupied by the given partitions.
+        *
+        * @param partitionIds identifying the partitions to be released
+        */
+       void releasePartitions(Collection<ResultPartitionID> partitionIds);
+
+       /**
+        * Report partitions which still occupy some resources locally.
+        *
+        * @return collection of partitions which still occupy some resources 
locally
+        * and have not been released yet.
+        */
+       Collection<ResultPartitionID> getPartitionsOccupyingLocalResources();
+
+       /**
+        * Factory method for the {@link InputGate InputGates} to consume 
result partitions.
+        *
+        * <p>The order of the {@link InputGate InputGates} in the returned 
array
+        * should be the same as the iteration order of the passed {@code 
inputGateDeploymentDescriptors}.
+        *
+        * @param ownerName the owner name, used for logs
+        * @param executionAttemptID execution attempt id of the consumer
+        * @param partitionProducerStateProvider producer state provider to 
query whether the producer is ready for consumption
+        * @param inputGateDeploymentDescriptors descriptors of the input gates 
to consume
+        * @param parentGroup parent of shuffle specific metric group
+        * @param inputGroup shuffle specific group for input metrics
+        * @param buffersGroup shuffle specific group for buffer metrics
+        * @return array of the {@link InputGate InputGates}
+        */
+       InputGate[] createInputGates(
+               String ownerName,
+               ExecutionAttemptID executionAttemptID,
+               PartitionProducerStateProvider partitionProducerStateProvider,
+               Collection<InputGateDeploymentDescriptor> 
inputGateDeploymentDescriptors,
+               MetricGroup parentGroup,
+               MetricGroup inputGroup,
+               MetricGroup buffersGroup);
+
+       /**
+        * Update a gate with the newly available partition information, 
previously unknown.
+        *
+        * @param consumerID execution id to distinguish gates with the same id 
from the different consumer executions
+        * @param partitionInfo information needed to consume the updated 
partition, e.g. network location
+        * @return {@code true} if the partition has been updated or {@code 
false} if the partition is not available anymore.
+        * @throws IOException IO problem by the update
+        * @throws InterruptedException potentially blocking operation was 
interrupted
+        */
+       boolean updatePartitionInfo(
+               ExecutionAttemptID consumerID,
+               PartitionInfo partitionInfo) throws IOException, 
InterruptedException;
+
+       /**
+        * {@link ShuffleEnvironment} local context used to create it.
+        */
+       class ShuffleEnvironmentContext {
+               private final Configuration configuration;
+               private final ResourceID location;
+               private final long maxJvmHeapMemory;
+               private final boolean localCommunicationOnly;
+               private final InetAddress hostAddress;
+               private final TaskEventPublisher eventPublisher;
+               private final MetricGroup parentMetricGroup;
+               private final IOManager ioManager;
+
+               public ShuffleEnvironmentContext(
+                               Configuration configuration,
+                               ResourceID location,
+                               long maxJvmHeapMemory,
+                               boolean localCommunicationOnly,
+                               InetAddress hostAddress,
+                               TaskEventDispatcher eventPublisher,
+                               MetricGroup parentMetricGroup,
+                               IOManager ioManager) {
+                       this.configuration = checkNotNull(configuration);
+                       this.location = checkNotNull(location);
+                       this.maxJvmHeapMemory = maxJvmHeapMemory;
+                       this.localCommunicationOnly = localCommunicationOnly;
+                       this.hostAddress = checkNotNull(hostAddress);
+                       this.eventPublisher = eventPublisher;
+                       this.parentMetricGroup = parentMetricGroup;
+                       this.ioManager = ioManager;
 
 Review comment:
   Why not doing the `checkNotNull` for the last three parameters?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to