This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d224759  [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to 
JM concept
d224759 is described below

commit d2247598897b273dbadfda0ab4c05e3678807ff5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 26 16:47:45 2019 +0200

    [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept
    
    The forceReleaseOnConsumption option/flag is now only evaluated on the 
JobMaster side and transparent to task managers,
---
 .../flink/configuration/JobManagerOptions.java     |  5 ++++
 .../NettyShuffleEnvironmentOptions.java            |  6 -----
 .../ResultPartitionDeploymentDescriptor.java       |  6 ++---
 .../flink/runtime/executiongraph/Execution.java    | 10 +++++++-
 .../runtime/executiongraph/ExecutionGraph.java     | 14 +++++++++--
 .../executiongraph/ExecutionGraphBuilder.java      |  6 ++++-
 .../io/network/NettyShuffleServiceFactory.java     |  3 +--
 .../network/partition/ResultPartitionFactory.java  |  8 ++-----
 .../NettyShuffleEnvironmentConfiguration.java      | 12 ----------
 .../ResultPartitionDeploymentDescriptorTest.java   | 18 ---------------
 .../io/network/NettyShuffleEnvironmentBuilder.java |  1 -
 .../io/network/partition/PartitionTestUtils.java   | 17 ++++++++++++++
 .../network/partition/ResultPartitionBuilder.java  |  3 +--
 .../partition/ResultPartitionFactoryTest.java      | 27 ++++++++++------------
 .../TaskExecutorPartitionLifecycleTest.java        |  4 ++--
 15 files changed, 69 insertions(+), 71 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a1d55b1..89515fd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -174,6 +174,11 @@ public class JobManagerOptions {
                                        text("'ng': new generation scheduler"))
                                .build());
 
+       @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
+       public static final ConfigOption<Boolean> 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+                       
key("jobmanager.scheduler.partition.force-release-on-consumption")
+                       .defaultValue(true);
+
        // 
---------------------------------------------------------------------------------------------
 
        private JobManagerOptions() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index abb86e6..699bd90 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.ConfigGroup;
 import org.apache.flink.annotation.docs.ConfigGroups;
-import org.apache.flink.annotation.docs.Documentation;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -210,11 +209,6 @@ public class NettyShuffleEnvironmentOptions {
                        
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
                        .withDescription("Maximum backoff in milliseconds for 
partition requests of input channels.");
 
-       @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
-       public static final ConfigOption<Boolean> 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-                       
key("taskmanager.network.partition.force-release-on-consumption")
-                       .defaultValue(true);
-
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 164e960..39b61d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -55,6 +56,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        private final ReleaseType releaseType;
 
+       @VisibleForTesting
        public ResultPartitionDeploymentDescriptor(
                        PartitionDescriptor partitionDescriptor,
                        ShuffleDescriptor shuffleDescriptor,
@@ -65,9 +67,7 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                        shuffleDescriptor,
                        maxParallelism,
                        sendScheduleOrUpdateConsumersMessage,
-                       // Later we might have to make the scheduling adjust 
automatically
-                       // if certain release type is not supported by shuffle 
service implementation at hand
-                       partitionDescriptor.getPartitionType() == 
ResultPartitionType.BLOCKING ? ReleaseType.MANUAL : ReleaseType.AUTO);
+                       ReleaseType.AUTO);
        }
 
        public ResultPartitionDeploymentDescriptor(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 910d8bf..c81ec18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -650,12 +650,20 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                .getExecutionGraph()
                                .getShuffleMaster()
                                
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
+
+                       final boolean releasePartitionOnConsumption =
+                               
vertex.getExecutionGraph().isForcePartitionReleaseOnConsumption()
+                               || 
!partitionDescriptor.getPartitionType().isBlocking();
+
                        CompletableFuture<ResultPartitionDeploymentDescriptor> 
partitionRegistration = shuffleDescriptorFuture
                                .thenApply(shuffleDescriptor -> new 
ResultPartitionDeploymentDescriptor(
                                        partitionDescriptor,
                                        shuffleDescriptor,
                                        maxParallelism,
-                                       lazyScheduling));
+                                       lazyScheduling,
+                                       releasePartitionOnConsumption
+                                               ? 
ShuffleDescriptor.ReleaseType.AUTO
+                                               : 
ShuffleDescriptor.ReleaseType.MANUAL));
                        partitionRegistrations.add(partitionRegistration);
                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4050622..0c20ee5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -305,6 +305,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
        /** Shuffle master to register partitions for task deployment. */
        private final ShuffleMaster<?> shuffleMaster;
 
+       private boolean forcePartitionReleaseOnConsumption;
+
        // 
--------------------------------------------------------------------------------------------
        //   Constructors
        // 
--------------------------------------------------------------------------------------------
@@ -406,7 +408,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        userClassLoader,
                        blobWriter,
                        allocationTimeout,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       true);
        }
 
        public ExecutionGraph(
@@ -421,7 +424,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        ClassLoader userClassLoader,
                        BlobWriter blobWriter,
                        Time allocationTimeout,
-                       ShuffleMaster<?> shuffleMaster) throws IOException {
+                       ShuffleMaster<?> shuffleMaster,
+                       boolean forcePartitionReleaseOnConsumption) throws 
IOException {
 
                checkNotNull(futureExecutor);
 
@@ -471,6 +475,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 
                this.shuffleMaster = checkNotNull(shuffleMaster);
 
+               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
+
                LOG.info("Job recovers via failover strategy: {}", 
failoverStrategy.getStrategyName());
        }
 
@@ -697,6 +703,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                return globalModVersion - 1;
        }
 
+       boolean isForcePartitionReleaseOnConsumption() {
+               return forcePartitionReleaseOnConsumption;
+       }
+
        @Override
        public ExecutionJobVertex getJobVertex(JobVertexID id) {
                return this.tasks.get(id);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ea7e124..c4eead1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -115,6 +115,9 @@ public class ExecutionGraphBuilder {
                final int maxPriorAttemptsHistoryLength =
                                
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
 
+               final boolean forcePartitionReleaseOnConsumption =
+                       
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
+
                // create a new execution graph, if none exists so far
                final ExecutionGraph executionGraph;
                try {
@@ -131,7 +134,8 @@ public class ExecutionGraphBuilder {
                                        classLoader,
                                        blobWriter,
                                        allocationTimeout,
-                                       shuffleMaster);
+                                       shuffleMaster,
+                                       forcePartitionReleaseOnConsumption);
                } catch (IOException e) {
                        throw new JobException("Could not create the 
ExecutionGraph.", e);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 15bdfbc..74dbe0f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -99,8 +99,7 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        ioManager,
                        networkBufferPool,
                        config.networkBuffersPerChannel(),
-                       config.floatingNetworkBuffersPerGate(),
-                       config.isForcePartitionReleaseOnConsumption());
+                       config.floatingNetworkBuffersPerGate());
 
                SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
                        taskExecutorResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index d15edaa..9479653 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -57,22 +57,18 @@ public class ResultPartitionFactory {
 
        private final int floatingNetworkBuffersPerGate;
 
-       private final boolean forcePartitionReleaseOnConsumption;
-
        public ResultPartitionFactory(
                @Nonnull ResultPartitionManager partitionManager,
                @Nonnull IOManager ioManager,
                @Nonnull BufferPoolFactory bufferPoolFactory,
                int networkBuffersPerChannel,
-               int floatingNetworkBuffersPerGate,
-               boolean forcePartitionReleaseOnConsumption) {
+               int floatingNetworkBuffersPerGate) {
 
                this.partitionManager = partitionManager;
                this.ioManager = ioManager;
                this.networkBuffersPerChannel = networkBuffersPerChannel;
                this.floatingNetworkBuffersPerGate = 
floatingNetworkBuffersPerGate;
                this.bufferPoolFactory = bufferPoolFactory;
-               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
        }
 
        public ResultPartition create(
@@ -86,7 +82,7 @@ public class ResultPartitionFactory {
                        desc.getPartitionType(),
                        desc.getNumberOfSubpartitions(),
                        desc.getMaxParallelism(),
-                       desc.isReleasedOnConsumption() || 
forcePartitionReleaseOnConsumption,
+                       desc.isReleasedOnConsumption(),
                        
createBufferPoolFactory(desc.getNumberOfSubpartitions(), 
desc.getPartitionType()));
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index b439f08..a6ff17b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -60,8 +60,6 @@ public class NettyShuffleEnvironmentConfiguration {
 
        private final boolean isNetworkDetailedMetrics;
 
-       private final boolean forcePartitionReleaseOnConsumption;
-
        private final NettyConfig nettyConfig;
 
        public NettyShuffleEnvironmentConfiguration(
@@ -73,7 +71,6 @@ public class NettyShuffleEnvironmentConfiguration {
                        int floatingNetworkBuffersPerGate,
                        boolean isCreditBased,
                        boolean isNetworkDetailedMetrics,
-                       boolean forcePartitionReleaseOnConsumption,
                        @Nullable NettyConfig nettyConfig) {
 
                this.numNetworkBuffers = numNetworkBuffers;
@@ -84,7 +81,6 @@ public class NettyShuffleEnvironmentConfiguration {
                this.floatingNetworkBuffersPerGate = 
floatingNetworkBuffersPerGate;
                this.isCreditBased = isCreditBased;
                this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
-               this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
                this.nettyConfig = nettyConfig;
        }
 
@@ -126,10 +122,6 @@ public class NettyShuffleEnvironmentConfiguration {
                return isNetworkDetailedMetrics;
        }
 
-       public boolean isForcePartitionReleaseOnConsumption() {
-               return forcePartitionReleaseOnConsumption;
-       }
-
        // 
------------------------------------------------------------------------
 
        /**
@@ -166,9 +158,6 @@ public class NettyShuffleEnvironmentConfiguration {
 
                boolean isNetworkDetailedMetrics = 
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
-               boolean forcePartitionReleaseOnConsumption = 
configuration.getBoolean(
-                       
NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
                return new NettyShuffleEnvironmentConfiguration(
                        numberOfNetworkBuffers,
                        pageSize,
@@ -178,7 +167,6 @@ public class NettyShuffleEnvironmentConfiguration {
                        extraBuffersPerGate,
                        isCreditBased,
                        isNetworkDetailedMetrics,
-                       forcePartitionReleaseOnConsumption,
                        nettyConfig);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1439f00..49cd478 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -104,24 +104,6 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
                assertThat(shuffleDescriptorCopy.getConnectionId(), 
is(connectionID));
        }
 
-       @Test
-       public void testReleasedOnConsumptionFlag() {
-               for (ResultPartitionType partitionType : 
ResultPartitionType.values()) {
-                       ResultPartitionDeploymentDescriptor partitionDescriptor 
= new ResultPartitionDeploymentDescriptor(
-                               new PartitionDescriptor(resultId, partitionId, 
partitionType, numberOfSubpartitions, connectionIndex),
-                               
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
-                               1,
-                               true
-                       );
-
-                       if (partitionType == ResultPartitionType.BLOCKING) {
-                               
assertThat(partitionDescriptor.isReleasedOnConsumption(), is(false));
-                       } else {
-                               
assertThat(partitionDescriptor.isReleasedOnConsumption(), is(true));
-                       }
-               }
-       }
-
        @Test(expected = IllegalArgumentException.class)
        public void testIncompatibleReleaseTypeManual() {
                new ResultPartitionDeploymentDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 9f0a9ce..b0ef430 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -128,7 +128,6 @@ public class NettyShuffleEnvironmentBuilder {
                                floatingNetworkBuffersPerGate,
                                isCreditBased,
                                isNetworkDetailedMetrics,
-                               true,
                                nettyConfig),
                        taskManagerLocation,
                        taskEventDispatcher,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 870a745..007fdc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -84,4 +84,21 @@ public class PartitionTestUtils {
                        1,
                        true);
        }
+
+       public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType releaseType) {
+               // set partition to blocking to support all release types
+               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal();
+               PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
+                       new IntermediateDataSetID(),
+                       
shuffleDescriptor.getResultPartitionID().getPartitionId(),
+                       ResultPartitionType.BLOCKING,
+                       1,
+                       0);
+               return new ResultPartitionDeploymentDescriptor(
+                       partitionDescriptor,
+                       shuffleDescriptor,
+                       1,
+                       true,
+                       releaseType);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index f1ba7fb..bf403ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -125,8 +125,7 @@ public class ResultPartitionBuilder {
                        ioManager,
                        networkBufferPool,
                        networkBuffersPerChannel,
-                       floatingNetworkBuffersPerGate,
-                       true);
+                       floatingNetworkBuffersPerGate);
 
                FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
factory = bufferPoolFactory.orElseGet(() ->
                        
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, 
partitionType));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 1de51fa..8e95f8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
 
@@ -39,23 +40,24 @@ import static org.hamcrest.MatcherAssert.assertThat;
 public class ResultPartitionFactoryTest extends TestLogger {
 
        @Test
-       public void testForceConsumptionOnReleaseEnabled() {
-               testForceConsumptionOnRelease(true);
+       public void testConsumptionOnReleaseEnabled() {
+               final ResultPartition resultPartition = 
createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
+               assertThat(resultPartition, 
instanceOf(ReleaseOnConsumptionResultPartition.class));
        }
 
        @Test
-       public void testForceConsumptionOnReleaseDisabled() {
-               testForceConsumptionOnRelease(false);
+       public void testConsumptionOnReleaseDisabled() {
+               final ResultPartition resultPartition = 
createResultPartition(ShuffleDescriptor.ReleaseType.MANUAL);
+               assertThat(resultPartition, 
not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
        }
 
-       private static void testForceConsumptionOnRelease(boolean 
forceConsumptionOnRelease) {
+       private static ResultPartition 
createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
                ResultPartitionFactory factory = new ResultPartitionFactory(
                        new ResultPartitionManager(),
                        new NoOpIOManager(),
                        new NetworkBufferPool(1, 64, 1),
                        1,
-                       1,
-                       forceConsumptionOnRelease
+                       1
                );
 
                ResultPartitionType partitionType = 
ResultPartitionType.BLOCKING;
@@ -68,15 +70,10 @@ public class ResultPartitionFactoryTest extends TestLogger {
                                0),
                        
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
                        1,
-                       true
+                       true,
+                       releaseType
                );
 
-               final ResultPartition test = factory.create("test", new 
ExecutionAttemptID(), descriptor);
-
-               if (forceConsumptionOnRelease) {
-                       assertThat(test, 
instanceOf(ReleaseOnConsumptionResultPartition.class));
-               } else {
-                       assertThat(test, 
not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
-               }
+               return factory.create("test", new ExecutionAttemptID(), 
descriptor);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9ceacad..a616c71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -57,6 +56,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -216,7 +216,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                boolean waitForRelease) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
-                       
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
+                       
PartitionTestUtils.createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType.MANUAL);
                final ExecutionAttemptID eid1 = 
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
                final TaskDeploymentDescriptor taskDeploymentDescriptor =

Reply via email to