This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ce4e2b7ec958570068838491bd88888f56f880c
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Fri Jul 26 12:13:23 2019 +0200

    [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release 
semantics fixed per partition type
    
    In a long term we do not need auto-release semantics for blocking 
(persistent) partition. We expect them always to be released externally by JM 
and assume they can be consumed multiple times.
    
    The pipelined partitions have always only one consumer and one consumption 
attempt. Afterwards they can be always released automatically.
    
    ShuffleDescriptor.ReleaseType was introduced to make release semantics more 
flexible but it is not needed in a long term.
    
    FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be 
able to fallback to 1.8 behaviour without the partition tracker and JM taking 
care about blocking partition release. We can make this option specific for 
NettyShuffleEnvironment which was the only existing shuffle service before. If 
it is activated then the blocking partition is also auto-released on a 
consumption attempt as it was before. The fine-grained recovery will just not 
find the partition after the jo [...]
---
 .../flink/configuration/JobManagerOptions.java     |  5 --
 .../NettyShuffleEnvironmentOptions.java            |  5 ++
 .../ResultPartitionDeploymentDescriptor.java       | 57 ----------------------
 .../flink/runtime/executiongraph/Execution.java    |  9 +---
 .../runtime/executiongraph/ExecutionGraph.java     | 10 ----
 .../executiongraph/ExecutionGraphBuilder.java      |  4 --
 .../io/network/NettyShuffleServiceFactory.java     |  3 +-
 .../io/network/partition/PartitionTrackerImpl.java |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 10 ++--
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 19 +-------
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  3 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 34 +------------
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 26 +++++-----
 .../flink/runtime/shuffle/ShuffleMaster.java       |  5 --
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 ---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../NettyShuffleEnvironmentConfiguration.java      | 21 ++++++--
 .../ResultPartitionDeploymentDescriptorTest.java   | 15 +-----
 .../io/network/NettyShuffleEnvironmentBuilder.java |  3 +-
 .../io/network/partition/PartitionTestUtils.java   | 56 ++-------------------
 .../partition/PartitionTrackerImplTest.java        | 43 ++++++++--------
 .../network/partition/ResultPartitionBuilder.java  |  4 +-
 .../partition/ResultPartitionFactoryTest.java      | 26 ++++++----
 .../TaskExecutorPartitionLifecycleTest.java        |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        | 13 +----
 .../recovery/BatchFineGrainedRecoveryITCase.java   |  2 +-
 26 files changed, 103 insertions(+), 287 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index e062829..3643667 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -195,11 +195,6 @@ public class JobManagerOptions {
                        .defaultValue(true)
                        .withDescription("Controls whether partitions should 
already be released during the job execution.");
 
-       @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
-       public static final ConfigOption<Boolean> 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-                       
key("jobmanager.scheduler.partition.force-release-on-consumption")
-                       .defaultValue(false);
-
        // 
---------------------------------------------------------------------------------------------
 
        private JobManagerOptions() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 4ba4c8e..733085e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -231,6 +231,11 @@ public class NettyShuffleEnvironmentOptions {
 
        // 
------------------------------------------------------------------------
 
+       @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
+       public static final ConfigOption<Boolean> 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+               
key("taskmanager.network.partition.force-release-on-consumption")
+                       .defaultValue(false);
+
        /** Not intended to be instantiated. */
        private NettyShuffleEnvironmentOptions() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 39b61d2..064c9bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,22 +18,16 @@
 
 package org.apache.flink.runtime.deployment;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
-import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 
 import java.io.Serializable;
-import java.util.Collection;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -54,48 +48,16 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
        private final boolean sendScheduleOrUpdateConsumersMessage;
 
-       private final ReleaseType releaseType;
-
-       @VisibleForTesting
        public ResultPartitionDeploymentDescriptor(
                        PartitionDescriptor partitionDescriptor,
                        ShuffleDescriptor shuffleDescriptor,
                        int maxParallelism,
                        boolean sendScheduleOrUpdateConsumersMessage) {
-               this(
-                       checkNotNull(partitionDescriptor),
-                       shuffleDescriptor,
-                       maxParallelism,
-                       sendScheduleOrUpdateConsumersMessage,
-                       ReleaseType.AUTO);
-       }
-
-       public ResultPartitionDeploymentDescriptor(
-                       PartitionDescriptor partitionDescriptor,
-                       ShuffleDescriptor shuffleDescriptor,
-                       int maxParallelism,
-                       boolean sendScheduleOrUpdateConsumersMessage,
-                       ReleaseType releaseType) {
-               
checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, 
releaseType);
                this.partitionDescriptor = checkNotNull(partitionDescriptor);
                this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
                
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
                this.maxParallelism = maxParallelism;
                this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
-               this.releaseType = releaseType;
-       }
-
-       private static void checkReleaseOnConsumptionIsSupportedForPartition(
-                       ShuffleDescriptor shuffleDescriptor,
-                       ReleaseType releaseType) {
-               checkNotNull(shuffleDescriptor);
-               checkArgument(
-                       
shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType),
-                       "Release type %s is not supported by the shuffle 
service for this partition" +
-                               "(id: %s), supported release types: %s",
-                       releaseType,
-                       shuffleDescriptor.getResultPartitionID(),
-                       shuffleDescriptor.getSupportedReleaseTypes());
        }
 
        public IntermediateDataSetID getResultId() {
@@ -126,25 +88,6 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return sendScheduleOrUpdateConsumersMessage;
        }
 
-       /**
-        * Returns whether to release the partition after having been fully 
consumed once.
-        *
-        * <p>Indicates whether the shuffle service should automatically 
release all partition resources after
-        * the first full consumption has been acknowledged. This kind of 
partition does not need to be explicitly released
-        * by {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-        * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-        *
-        * <p>The partition has to support the corresponding {@link 
ReleaseType} in
-        * {@link ShuffleDescriptor#getSupportedReleaseTypes()}:
-        * {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to 
return {@code true} and
-        * {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to 
return {@code false}.
-        *
-        * @return whether to release the partition after having been fully 
consumed once.
-        */
-       public boolean isReleasedOnConsumption() {
-               return releaseType == ReleaseType.AUTO;
-       }
-
        @Override
        public String toString() {
                return String.format("ResultPartitionDeploymentDescriptor 
[PartitionDescriptor: %s, "
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6679fe1..af94188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -631,19 +631,12 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                .getShuffleMaster()
                                
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
 
-                       final boolean releasePartitionOnConsumption =
-                               
vertex.getExecutionGraph().isForcePartitionReleaseOnConsumption()
-                               || 
!partitionDescriptor.getPartitionType().isBlocking();
-
                        CompletableFuture<ResultPartitionDeploymentDescriptor> 
partitionRegistration = shuffleDescriptorFuture
                                .thenApply(shuffleDescriptor -> new 
ResultPartitionDeploymentDescriptor(
                                        partitionDescriptor,
                                        shuffleDescriptor,
                                        maxParallelism,
-                                       lazyScheduling,
-                                       releasePartitionOnConsumption
-                                               ? 
ShuffleDescriptor.ReleaseType.AUTO
-                                               : 
ShuffleDescriptor.ReleaseType.MANUAL));
+                                       lazyScheduling));
                        partitionRegistrations.add(partitionRegistration);
                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cc51042..f779fd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -320,8 +320,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
        /** Shuffle master to register partitions for task deployment. */
        private final ShuffleMaster<?> shuffleMaster;
 
-       private boolean forcePartitionReleaseOnConsumption;
-
        // 
--------------------------------------------------------------------------------------------
        //   Constructors
        // 
--------------------------------------------------------------------------------------------
@@ -425,7 +423,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        allocationTimeout,
                        new NotReleasingPartitionReleaseStrategy.Factory(),
                        NettyShuffleMaster.INSTANCE,
-                       true,
                        new PartitionTrackerImpl(
                                jobInformation.getJobId(),
                                NettyShuffleMaster.INSTANCE,
@@ -448,7 +445,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        Time allocationTimeout,
                        PartitionReleaseStrategy.Factory 
partitionReleaseStrategyFactory,
                        ShuffleMaster<?> shuffleMaster,
-                       boolean forcePartitionReleaseOnConsumption,
                        PartitionTracker partitionTracker,
                        ScheduleMode scheduleMode,
                        boolean allowQueuedScheduling) throws IOException {
@@ -511,8 +507,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 
                this.shuffleMaster = checkNotNull(shuffleMaster);
 
-               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
-
                this.partitionTracker = checkNotNull(partitionTracker);
 
                this.resultPartitionAvailabilityChecker = new 
ExecutionGraphResultPartitionAvailabilityChecker(
@@ -737,10 +731,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                return globalModVersion - 1;
        }
 
-       boolean isForcePartitionReleaseOnConsumption() {
-               return forcePartitionReleaseOnConsumption;
-       }
-
        @Override
        public ExecutionJobVertex getJobVertex(JobVertexID id) {
                return this.tasks.get(id);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 018fd81..adbd5fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -161,9 +161,6 @@ public class ExecutionGraphBuilder {
                final PartitionReleaseStrategy.Factory 
partitionReleaseStrategyFactory =
                        
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);
 
-               final boolean forcePartitionReleaseOnConsumption =
-                       
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
                // create a new execution graph, if none exists so far
                final ExecutionGraph executionGraph;
                try {
@@ -182,7 +179,6 @@ public class ExecutionGraphBuilder {
                                        allocationTimeout,
                                        partitionReleaseStrategyFactory,
                                        shuffleMaster,
-                                       forcePartitionReleaseOnConsumption,
                                        partitionTracker,
                                        jobGraph.getScheduleMode(),
                                        jobGraph.getAllowQueuedScheduling());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 431360a..f266c77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -104,7 +104,8 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        config.getBlockingSubpartitionType(),
                        config.networkBuffersPerChannel(),
                        config.floatingNetworkBuffersPerGate(),
-                       config.networkBufferSize());
+                       config.networkBufferSize(),
+                       config.isForcePartitionReleaseOnConsumption());
 
                SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
                        taskExecutorResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index c52e8b1..f772b37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -64,8 +64,8 @@ public class PartitionTrackerImpl implements PartitionTracker 
{
                Preconditions.checkNotNull(producingTaskExecutorId);
                Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
 
-               // if it is released on consumption we do not need to issue any 
release calls
-               if 
(resultPartitionDeploymentDescriptor.isReleasedOnConsumption()) {
+               // only blocking partitions require explicit release call
+               if 
(!resultPartitionDeploymentDescriptor.getPartitionType().isBlocking()) {
                        return;
                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 4d5cf23..b390987 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -63,6 +63,8 @@ public class ResultPartitionFactory {
 
        private final int networkBufferSize;
 
+       private final boolean forcePartitionReleaseOnConsumption;
+
        public ResultPartitionFactory(
                @Nonnull ResultPartitionManager partitionManager,
                @Nonnull FileChannelManager channelManager,
@@ -70,7 +72,8 @@ public class ResultPartitionFactory {
                BoundedBlockingSubpartitionType blockingSubpartitionType,
                int networkBuffersPerChannel,
                int floatingNetworkBuffersPerGate,
-               int networkBufferSize) {
+               int networkBufferSize,
+               boolean forcePartitionReleaseOnConsumption) {
 
                this.partitionManager = partitionManager;
                this.channelManager = channelManager;
@@ -79,6 +82,7 @@ public class ResultPartitionFactory {
                this.bufferPoolFactory = bufferPoolFactory;
                this.blockingSubpartitionType = blockingSubpartitionType;
                this.networkBufferSize = networkBufferSize;
+               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
        }
 
        public ResultPartition create(
@@ -91,7 +95,6 @@ public class ResultPartitionFactory {
                        desc.getPartitionType(),
                        desc.getNumberOfSubpartitions(),
                        desc.getMaxParallelism(),
-                       desc.isReleasedOnConsumption(),
                        
createBufferPoolFactory(desc.getNumberOfSubpartitions(), 
desc.getPartitionType()));
        }
 
@@ -102,12 +105,11 @@ public class ResultPartitionFactory {
                @Nonnull ResultPartitionType type,
                int numberOfSubpartitions,
                int maxParallelism,
-               boolean releasePartitionOnConsumption,
                FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
bufferPoolFactory) {
 
                ResultSubpartition[] subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
 
-               ResultPartition partition = releasePartitionOnConsumption
+               ResultPartition partition = forcePartitionReleaseOnConsumption 
|| !type.isBlocking()
                        ? new ReleaseOnConsumptionResultPartition(
                                taskNameWithSubtaskAndId,
                                id,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 8086b27..f758bcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -35,29 +34,19 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
 
        private static final long serialVersionUID = 852181945034989215L;
 
-       private static final EnumSet<ReleaseType> 
SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS =
-               EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL);
-
-       private static final EnumSet<ReleaseType> 
SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS =
-               EnumSet.of(ReleaseType.AUTO);
-
        private final ResourceID producerLocation;
 
        private final PartitionConnectionInfo partitionConnectionInfo;
 
        private final ResultPartitionID resultPartitionID;
 
-       private final boolean isBlocking;
-
        public NettyShuffleDescriptor(
                        ResourceID producerLocation,
                        PartitionConnectionInfo partitionConnectionInfo,
-                       ResultPartitionID resultPartitionID,
-                       boolean isBlocking) {
+                       ResultPartitionID resultPartitionID) {
                this.producerLocation = producerLocation;
                this.partitionConnectionInfo = partitionConnectionInfo;
                this.resultPartitionID = resultPartitionID;
-               this.isBlocking = isBlocking;
        }
 
        public ConnectionID getConnectionId() {
@@ -74,12 +63,6 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
                return Optional.of(producerLocation);
        }
 
-       @Override
-       public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-               return isBlocking ?
-                       SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : 
SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS;
-       }
-
        public boolean isLocalTo(ResourceID consumerLocation) {
                return producerLocation.equals(consumerLocation);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index c369ff1..6c2cb32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -42,8 +42,7 @@ public enum NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
                NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
                        producerDescriptor.getProducerLocation(),
                        createConnectionInfo(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
-                       resultPartitionID,
-                       partitionDescriptor.getPartitionType().isBlocking());
+                       resultPartitionID);
 
                return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 5af56f2..17feacb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -19,12 +19,10 @@
 package org.apache.flink.runtime.shuffle;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -61,39 +59,11 @@ public interface ShuffleDescriptor extends Serializable {
         *
         * <p>Indicates that this partition occupies local resources in the 
producing task executor. Such partition requires
         * that the task executor is running and being connected to be able to 
consume the produced data. This is mostly
-        * relevant for the batch jobs and blocking result partitions which 
should outlive the producer lifetime and
-        * be released externally: {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false}.
+        * relevant for the batch jobs and blocking result partitions which can 
outlive the producer lifetime and
+        * be released externally.
         * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} can 
be used to release such kind of partitions locally.
         *
         * @return the resource id of the producing task executor if the 
partition occupies local resources there
         */
        Optional<ResourceID> storesLocalResourcesOn();
-
-       /**
-        * Return release types supported by Shuffle Service for this partition.
-        */
-       EnumSet<ReleaseType> getSupportedReleaseTypes();
-
-       /**
-        * Partition release type.
-        */
-       enum ReleaseType {
-               /**
-                * Auto-release the partition after having been fully consumed 
once.
-                *
-                * <p>No additional actions required, like {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
-                * or {@link 
ShuffleEnvironment#releasePartitionsLocally(Collection)}
-                */
-               AUTO,
-
-               /**
-                * Manually release the partition, the partition has to support 
consumption multiple times.
-                *
-                * <p>The partition requires manual release once all 
consumption is done:
-                * {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and
-                * if the partition occupies producer local resources ({@link 
#storesLocalResourcesOn()}) then also
-                * {@link 
ShuffleEnvironment#releasePartitionsLocally(Collection)}.
-                */
-               MANUAL
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
index ed66f2d..e6a4802 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -61,17 +60,18 @@ import java.util.Collection;
  *     <li>{@link ResultPartitionWriter#fail(Throwable)} and {@link 
ResultPartitionWriter#close()} are called
  *     if the production has failed.
  *     </li>
- *     <li>if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} 
and
- *     {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called when the production is done.
- *     The actual release can take some time depending on implementation 
details,
- *     e.g. if the `end of consumption' confirmation from the consumer is 
being awaited implicitly.
- *     The partition has to support the {@link ReleaseType#AUTO} in {@link 
ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
- *     <li>if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} 
and
- *     {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and 
{@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
- *     if it occupies any producer local resources ({@link 
ShuffleDescriptor#storesLocalResourcesOn()}),
- *     are called outside of the producer thread, e.g. to manage the lifecycle 
of BLOCKING result partitions
- *     which can outlive their producers. The partition has to support the 
{@link ReleaseType#MANUAL} in
- *     {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
+ *     <li>for PIPELINED partitions if there was a detected consumption 
attempt and it either failed or finished
+ *     after the bounded production has been done ({@link 
ResultPartitionWriter#finish()} and
+ *     {@link ResultPartitionWriter#close()} have been called). Only one 
consumption attempt is ever expected for
+ *     the PIPELINED partition at the moment so it can be released afterwards.
+ *     <li>if the following methods are called outside of the producer thread:
+ *     <ol>
+ *         <li>{@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}</li>
+ *         <li>and if it occupies any producer local resources ({@link 
ShuffleDescriptor#storesLocalResourcesOn()})
+ *             then also {@link 
ShuffleEnvironment#releasePartitionsLocally(Collection)}</li>
+ *     </ol>
+ *     e.g. to manage the lifecycle of BLOCKING result partitions which can 
outlive their producers.
+ *     The BLOCKING partitions can be consumed multiple times.</li>
  * </ol>
  * The partitions, which currently still occupy local resources, can be 
queried with
  * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
@@ -132,8 +132,6 @@ public interface ShuffleEnvironment<P extends 
ResultPartitionWriter, G extends I
         *
         * <p>This is called for partitions which occupy resources locally
         * (can be checked by {@link 
ShuffleDescriptor#storesLocalResourcesOn()}).
-        * This method is not called if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-        * The partition has to support the {@link ReleaseType#MANUAL} in 
{@link ShuffleDescriptor#getSupportedReleaseTypes()}.
         *
         * @param partitionIds identifying the partitions to be released
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index a9ef1c6..9f729c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.shuffle;
 
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
-
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -51,8 +48,6 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
         *
         * <p>This call triggers release of any resources which are occupied by 
the given partition in the external systems
         * outside of the producer executor. This is mostly relevant for the 
batch jobs and blocking result partitions.
-        * This method is not called if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
-        * The partition has to support the {@link ReleaseType#MANUAL} in 
{@link ShuffleDescriptor#getSupportedReleaseTypes()}.
         * The producer local resources are managed by {@link 
ShuffleDescriptor#storesLocalResourcesOn()} and
         * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
index 7c35516..339f343 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
-import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -57,9 +56,4 @@ public final class UnknownShuffleDescriptor implements 
ShuffleDescriptor {
        public Optional<ResourceID> storesLocalResourcesOn() {
                return Optional.empty();
        }
-
-       @Override
-       public EnumSet<ReleaseType> getSupportedReleaseTypes() {
-               return EnumSet.noneOf(ReleaseType.class);
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8c99c0f..9b295dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -613,8 +613,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        private void setupResultPartitionBookkeeping(TaskDeploymentDescriptor 
tdd, CompletableFuture<ExecutionState> terminationFuture) {
                final List<ResultPartitionID> partitionsRequiringRelease = 
tdd.getProducedPartitions().stream()
-                       // partitions that are released on consumption don't 
require any explicit release call
-                       .filter(d -> !d.isReleasedOnConsumption())
+                       // only blocking partitions require explicit release 
call
+                       .filter(d -> d.getPartitionType().isBlocking())
                        
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
                        // partitions without local resources don't store 
anything on the TaskExecutor
                        .filter(d -> d.storesLocalResourcesOn().isPresent())
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 63eebb2..ba6bb77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -73,6 +73,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
        private final BoundedBlockingSubpartitionType blockingSubpartitionType;
 
+       private final boolean forcePartitionReleaseOnConsumption;
+
        public NettyShuffleEnvironmentConfiguration(
                        int numNetworkBuffers,
                        int networkBufferSize,
@@ -85,7 +87,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        boolean isNetworkDetailedMetrics,
                        @Nullable NettyConfig nettyConfig,
                        String[] tempDirs,
-                       BoundedBlockingSubpartitionType 
blockingSubpartitionType) {
+                       BoundedBlockingSubpartitionType 
blockingSubpartitionType,
+                       boolean forcePartitionReleaseOnConsumption) {
 
                this.numNetworkBuffers = numNetworkBuffers;
                this.networkBufferSize = networkBufferSize;
@@ -99,6 +102,7 @@ public class NettyShuffleEnvironmentConfiguration {
                this.nettyConfig = nettyConfig;
                this.tempDirs = Preconditions.checkNotNull(tempDirs);
                this.blockingSubpartitionType = 
Preconditions.checkNotNull(blockingSubpartitionType);
+               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
        }
 
        // 
------------------------------------------------------------------------
@@ -151,6 +155,10 @@ public class NettyShuffleEnvironmentConfiguration {
                return blockingSubpartitionType;
        }
 
+       public boolean isForcePartitionReleaseOnConsumption() {
+               return forcePartitionReleaseOnConsumption;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -194,6 +202,9 @@ public class NettyShuffleEnvironmentConfiguration {
 
                BoundedBlockingSubpartitionType blockingSubpartitionType = 
getBlockingSubpartitionType(configuration);
 
+               boolean forcePartitionReleaseOnConsumption =
+                       
configuration.getBoolean(NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
+
                return new NettyShuffleEnvironmentConfiguration(
                        numberOfNetworkBuffers,
                        pageSize,
@@ -206,7 +217,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        isNetworkDetailedMetrics,
                        nettyConfig,
                        tempDirs,
-                       blockingSubpartitionType);
+                       blockingSubpartitionType,
+                       forcePartitionReleaseOnConsumption);
        }
 
        /**
@@ -529,6 +541,7 @@ public class NettyShuffleEnvironmentConfiguration {
                result = 31 * result + (isCreditBased ? 1 : 0);
                result = 31 * result + (nettyConfig != null ? 
nettyConfig.hashCode() : 0);
                result = 31 * result + Arrays.hashCode(tempDirs);
+               result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 
: 0);
                return result;
        }
 
@@ -552,7 +565,8 @@ public class NettyShuffleEnvironmentConfiguration {
                                        
this.requestSegmentsTimeout.equals(that.requestSegmentsTimeout) &&
                                        this.isCreditBased == 
that.isCreditBased &&
                                        (nettyConfig != null ? 
nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null) &&
-                                       Arrays.equals(this.tempDirs, 
that.tempDirs);
+                                       Arrays.equals(this.tempDirs, 
that.tempDirs) &&
+                                       this.forcePartitionReleaseOnConsumption 
== that.forcePartitionReleaseOnConsumption;
                }
        }
 
@@ -569,6 +583,7 @@ public class NettyShuffleEnvironmentConfiguration {
                                ", isCreditBased=" + isCreditBased +
                                ", nettyConfig=" + nettyConfig +
                                ", tempDirs=" + Arrays.toString(tempDirs) +
+                               ", forcePartitionReleaseOnConsumption=" + 
forcePartitionReleaseOnConsumption +
                                '}';
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 49cd478..e76af09 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -30,9 +30,7 @@ import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -90,8 +88,7 @@ public class ResultPartitionDeploymentDescriptorTest extends 
TestLogger {
                ShuffleDescriptor shuffleDescriptor = new 
NettyShuffleDescriptor(
                        producerLocation,
                        new NetworkPartitionConnectionInfo(connectionID),
-                       resultPartitionID,
-                       false);
+                       resultPartitionID);
 
                ResultPartitionDeploymentDescriptor copy =
                        
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
@@ -104,16 +101,6 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
                assertThat(shuffleDescriptorCopy.getConnectionId(), 
is(connectionID));
        }
 
-       @Test(expected = IllegalArgumentException.class)
-       public void testIncompatibleReleaseTypeManual() {
-               new ResultPartitionDeploymentDescriptor(
-                       partitionDescriptor,
-                       
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(),
-                       1,
-                       true,
-                       ReleaseType.MANUAL);
-       }
-
        private static ResultPartitionDeploymentDescriptor 
createCopyAndVerifyResultPartitionDeploymentDescriptor(
                        ShuffleDescriptor shuffleDescriptor) throws IOException 
{
                ResultPartitionDeploymentDescriptor orig = new 
ResultPartitionDeploymentDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 2d360ba..96b6330 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -143,7 +143,8 @@ public class NettyShuffleEnvironmentBuilder {
                                isNetworkDetailedMetrics,
                                nettyConfig,
                                tempDirs,
-                               BoundedBlockingSubpartitionType.AUTO),
+                               BoundedBlockingSubpartitionType.AUTO,
+                               false),
                        taskManagerLocation,
                        taskEventDispatcher,
                        metricGroup);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 5f80421..cf50051 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -30,8 +29,6 @@ import 
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
-import java.util.EnumSet;
-import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertThat;
@@ -97,8 +94,9 @@ public class PartitionTestUtils {
                }
        }
 
-       public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(ResultPartitionType partitionType) {
-               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal();
+       static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(
+               ResultPartitionType partitionType) {
+               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
                PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
                        new IntermediateDataSetID(),
                        
shuffleDescriptor.getResultPartitionID().getPartitionId(),
@@ -112,52 +110,8 @@ public class PartitionTestUtils {
                        true);
        }
 
-       public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType releaseType) {
-               // set partition to blocking to support all release types
-               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal();
-               PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
-                       new IntermediateDataSetID(),
-                       
shuffleDescriptor.getResultPartitionID().getPartitionId(),
-                       ResultPartitionType.BLOCKING,
-                       1,
-                       0);
-               return new ResultPartitionDeploymentDescriptor(
-                       partitionDescriptor,
-                       shuffleDescriptor,
-                       1,
-                       true,
-                       releaseType);
-       }
-
-       public static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionId, 
ShuffleDescriptor.ReleaseType releaseType, boolean hasLocalResources) {
-               return new ResultPartitionDeploymentDescriptor(
-                       new PartitionDescriptor(
-                               new IntermediateDataSetID(),
-                               resultPartitionId.getPartitionId(),
-                               ResultPartitionType.BLOCKING,
-                               1,
-                               0),
-                       new ShuffleDescriptor() {
-                               @Override
-                               public ResultPartitionID getResultPartitionID() 
{
-                                       return resultPartitionId;
-                               }
-
-                               @Override
-                               public Optional<ResourceID> 
storesLocalResourcesOn() {
-                                       return hasLocalResources
-                                               ? 
Optional.of(ResourceID.generate())
-                                               : Optional.empty();
-                               }
-
-                               @Override
-                               public EnumSet<ReleaseType> 
getSupportedReleaseTypes() {
-                                       return EnumSet.of(releaseType);
-                               }
-                       },
-                       1,
-                       true,
-                       releaseType);
+       public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor() {
+               return 
createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
        }
 
        public static void writeBuffers(ResultPartition partition, int 
numberOfBuffers, int bufferSize) throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 07aba93..5ca7156 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -34,14 +34,12 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
@@ -53,15 +51,15 @@ public class PartitionTrackerImplTest extends TestLogger {
 
        @Test
        public void testReleasedOnConsumptionPartitionIsNotTracked() {
-               
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.AUTO);
+               testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
        }
 
        @Test
        public void testRetainedOnConsumptionPartitionIsTracked() {
-               
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.MANUAL);
+               testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
        }
 
-       private void 
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType releaseType) {
+       private void testReleaseOnConsumptionHandling(ResultPartitionType 
resultPartitionType) {
                final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
                        new JobID(),
                        new TestingShuffleMaster(),
@@ -74,10 +72,11 @@ public class PartitionTrackerImplTest extends TestLogger {
                        resourceId,
                        createResultPartitionDeploymentDescriptor(
                                resultPartitionId,
-                               releaseType,
+                               resultPartitionType,
                                false));
 
-               
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), 
is(not(releaseType)));
+               final boolean isTrackingExpected = resultPartitionType == 
ResultPartitionType.BLOCKING;
+               
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), 
is(isTrackingExpected));
        }
 
        @Test
@@ -97,7 +96,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
                partitionTracker.startTrackingPartition(
                        executorWithTrackedPartition,
-                       createResultPartitionDeploymentDescriptor(new 
ResultPartitionID(), ShuffleDescriptor.ReleaseType.MANUAL, true));
+                       createResultPartitionDeploymentDescriptor(new 
ResultPartitionID(), true));
 
                
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition),
 is(true));
                
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition),
 is(false));
@@ -127,10 +126,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
                partitionTracker.startTrackingPartition(
                        taskExecutorId1,
-                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
                partitionTracker.startTrackingPartition(
                        taskExecutorId2,
-                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, true));
 
                {
                        
partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -183,10 +182,10 @@ public class PartitionTrackerImplTest extends TestLogger {
 
                partitionTracker.startTrackingPartition(
                        taskExecutorId1,
-                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, false));
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, false));
                partitionTracker.startTrackingPartition(
                        taskExecutorId2,
-                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, 
ShuffleDescriptor.ReleaseType.MANUAL, false));
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, false));
 
                {
                        
partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
@@ -227,7 +226,7 @@ public class PartitionTrackerImplTest extends TestLogger {
 
                partitionTracker.startTrackingPartition(
                        taskExecutorId1,
-                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, true));
 
                partitionTracker.stopTrackingPartitionsFor(taskExecutorId1);
 
@@ -236,15 +235,21 @@ public class PartitionTrackerImplTest extends TestLogger {
        }
 
        private static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(
+                       ResultPartitionID resultPartitionId,
+                       boolean hasLocalResources) {
+               return 
createResultPartitionDeploymentDescriptor(resultPartitionId, 
ResultPartitionType.BLOCKING, hasLocalResources);
+       }
+
+       private static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(
                ResultPartitionID resultPartitionId,
-               ShuffleDescriptor.ReleaseType releaseType,
+               ResultPartitionType type,
                boolean hasLocalResources) {
 
                return new ResultPartitionDeploymentDescriptor(
                        new PartitionDescriptor(
                                new IntermediateDataSetID(),
                                resultPartitionId.getPartitionId(),
-                               ResultPartitionType.BLOCKING,
+                               type,
                                1,
                                0),
                        new ShuffleDescriptor() {
@@ -259,15 +264,9 @@ public class PartitionTrackerImplTest extends TestLogger {
                                                ? 
Optional.of(ResourceID.generate())
                                                : Optional.empty();
                                }
-
-                               @Override
-                               public EnumSet<ReleaseType> 
getSupportedReleaseTypes() {
-                                       return EnumSet.of(releaseType);
-                               }
                        },
                        1,
-                       true,
-                       releaseType);
+                       true);
        }
 
        private static TaskExecutorGateway createTaskExecutorGateway(ResourceID 
taskExecutorId, Collection<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> releaseCalls) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 8eb68d3..27eaab8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -142,7 +142,8 @@ public class ResultPartitionBuilder {
                        blockingSubpartitionType,
                        networkBuffersPerChannel,
                        floatingNetworkBuffersPerGate,
-                       networkBufferSize);
+                       networkBufferSize,
+                       releasedOnConsumption);
 
                FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
factory = bufferPoolFactory.orElseGet(() ->
                        
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, 
partitionType));
@@ -153,7 +154,6 @@ public class ResultPartitionBuilder {
                        partitionType,
                        numberOfSubpartitions,
                        numTargetKeyGroups,
-                       releasedOnConsumption,
                        factory);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index b2a4d16..8065829 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
@@ -57,18 +56,26 @@ public class ResultPartitionFactoryTest extends TestLogger {
        }
 
        @Test
-       public void testConsumptionOnReleaseEnabled() {
-               final ResultPartition resultPartition = 
createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
+       public void testConsumptionOnReleaseForced() {
+               final ResultPartition resultPartition = 
createResultPartition(true, ResultPartitionType.BLOCKING);
+               assertThat(resultPartition, 
instanceOf(ReleaseOnConsumptionResultPartition.class));
+       }
+
+       @Test
+       public void testConsumptionOnReleaseEnabledForNonBlocking() {
+               final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.PIPELINED);
                assertThat(resultPartition, 
instanceOf(ReleaseOnConsumptionResultPartition.class));
        }
 
        @Test
        public void testConsumptionOnReleaseDisabled() {
-               final ResultPartition resultPartition = 
createResultPartition(ShuffleDescriptor.ReleaseType.MANUAL);
+               final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.BLOCKING);
                assertThat(resultPartition, 
not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
        }
 
-       private static ResultPartition 
createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
+       private static ResultPartition createResultPartition(
+                       boolean releasePartitionOnConsumption,
+                       ResultPartitionType partitionType) {
                ResultPartitionFactory factory = new ResultPartitionFactory(
                        new ResultPartitionManager(),
                        fileChannelManager,
@@ -76,9 +83,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        BoundedBlockingSubpartitionType.AUTO,
                        1,
                        1,
-                       64);
+                       64,
+                       releasePartitionOnConsumption);
 
-               ResultPartitionType partitionType = 
ResultPartitionType.BLOCKING;
                final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(
                        new PartitionDescriptor(
                                new IntermediateDataSetID(),
@@ -86,10 +93,9 @@ public class ResultPartitionFactoryTest extends TestLogger {
                                partitionType,
                                1,
                                0),
-                       
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
+                       NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
                        1,
-                       true,
-                       releaseType
+                       true
                );
 
                return factory.create("test", descriptor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index f5999e9..d87cf84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -57,7 +57,6 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
-import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -217,7 +216,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                boolean waitForRelease) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
-                       
PartitionTestUtils.createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType.MANUAL);
+                       
PartitionTestUtils.createPartitionDeploymentDescriptor();
                final ExecutionAttemptID eid1 = 
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
                final TaskDeploymentDescriptor taskDeploymentDescriptor =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 0ecd81a..2d58d03 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -40,8 +40,6 @@ public class NettyShuffleDescriptorBuilder {
        private InetAddress address = InetAddress.getLoopbackAddress();
        private int dataPort = 0;
        private int connectionIndex = 0;
-       private boolean isBlocking;
-
        public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID 
producerLocation) {
                this.producerLocation = producerLocation;
                return this;
@@ -74,26 +72,19 @@ public class NettyShuffleDescriptorBuilder {
                return this;
        }
 
-       public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) {
-               this.isBlocking = isBlocking;
-               return this;
-       }
-
        public NettyShuffleDescriptor buildRemote() {
                ConnectionID connectionID = new ConnectionID(new 
InetSocketAddress(address, dataPort), connectionIndex);
                return new NettyShuffleDescriptor(
                        producerLocation,
                        new NetworkPartitionConnectionInfo(connectionID),
-                       id,
-                       isBlocking);
+                       id);
        }
 
        public NettyShuffleDescriptor buildLocal() {
                return new NettyShuffleDescriptor(
                        producerLocation,
                        LocalExecutionPartitionConnectionInfo.INSTANCE,
-                       id,
-                       isBlocking);
+                       id);
        }
 
        public static NettyShuffleDescriptorBuilder newBuilder() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 1549864..9e90c20 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -58,7 +58,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import static 
org.apache.flink.configuration.JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
+import static 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
 import static 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;

Reply via email to