This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d224759 [FLINK-12993][runtime] Refactor forceReleaseOnConsumption to
JM concept
d224759 is described below
commit d2247598897b273dbadfda0ab4c05e3678807ff5
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 26 16:47:45 2019 +0200
[FLINK-12993][runtime] Refactor forceReleaseOnConsumption to JM concept
The forceReleaseOnConsumption option/flag is now only evaluated on the
JobMaster side and transparent to task managers,
---
.../flink/configuration/JobManagerOptions.java | 5 ++++
.../NettyShuffleEnvironmentOptions.java | 6 -----
.../ResultPartitionDeploymentDescriptor.java | 6 ++---
.../flink/runtime/executiongraph/Execution.java | 10 +++++++-
.../runtime/executiongraph/ExecutionGraph.java | 14 +++++++++--
.../executiongraph/ExecutionGraphBuilder.java | 6 ++++-
.../io/network/NettyShuffleServiceFactory.java | 3 +--
.../network/partition/ResultPartitionFactory.java | 8 ++-----
.../NettyShuffleEnvironmentConfiguration.java | 12 ----------
.../ResultPartitionDeploymentDescriptorTest.java | 18 ---------------
.../io/network/NettyShuffleEnvironmentBuilder.java | 1 -
.../io/network/partition/PartitionTestUtils.java | 17 ++++++++++++++
.../network/partition/ResultPartitionBuilder.java | 3 +--
.../partition/ResultPartitionFactoryTest.java | 27 ++++++++++------------
.../TaskExecutorPartitionLifecycleTest.java | 4 ++--
15 files changed, 69 insertions(+), 71 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index a1d55b1..89515fd 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -174,6 +174,11 @@ public class JobManagerOptions {
text("'ng': new generation scheduler"))
.build());
+ @Documentation.ExcludeFromDocumentation("dev use only; likely
temporary")
+ public static final ConfigOption<Boolean>
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+
key("jobmanager.scheduler.partition.force-release-on-consumption")
+ .defaultValue(true);
+
//
---------------------------------------------------------------------------------------------
private JobManagerOptions() {
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index abb86e6..699bd90 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.ConfigGroup;
import org.apache.flink.annotation.docs.ConfigGroups;
-import org.apache.flink.annotation.docs.Documentation;
import static org.apache.flink.configuration.ConfigOptions.key;
@@ -210,11 +209,6 @@ public class NettyShuffleEnvironmentOptions {
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription("Maximum backoff in milliseconds for
partition requests of input channels.");
- @Documentation.ExcludeFromDocumentation("dev use only; likely
temporary")
- public static final ConfigOption<Boolean>
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-
key("taskmanager.network.partition.force-release-on-consumption")
- .defaultValue(true);
-
//
------------------------------------------------------------------------
/** Not intended to be instantiated. */
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 164e960..39b61d2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.deployment;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -55,6 +56,7 @@ public class ResultPartitionDeploymentDescriptor implements
Serializable {
private final ReleaseType releaseType;
+ @VisibleForTesting
public ResultPartitionDeploymentDescriptor(
PartitionDescriptor partitionDescriptor,
ShuffleDescriptor shuffleDescriptor,
@@ -65,9 +67,7 @@ public class ResultPartitionDeploymentDescriptor implements
Serializable {
shuffleDescriptor,
maxParallelism,
sendScheduleOrUpdateConsumersMessage,
- // Later we might have to make the scheduling adjust
automatically
- // if certain release type is not supported by shuffle
service implementation at hand
- partitionDescriptor.getPartitionType() ==
ResultPartitionType.BLOCKING ? ReleaseType.MANUAL : ReleaseType.AUTO);
+ ReleaseType.AUTO);
}
public ResultPartitionDeploymentDescriptor(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 910d8bf..c81ec18 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -650,12 +650,20 @@ public class Execution implements AccessExecution,
Archiveable<ArchivedExecution
.getExecutionGraph()
.getShuffleMaster()
.registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
+
+ final boolean releasePartitionOnConsumption =
+
vertex.getExecutionGraph().isForcePartitionReleaseOnConsumption()
+ ||
!partitionDescriptor.getPartitionType().isBlocking();
+
CompletableFuture<ResultPartitionDeploymentDescriptor>
partitionRegistration = shuffleDescriptorFuture
.thenApply(shuffleDescriptor -> new
ResultPartitionDeploymentDescriptor(
partitionDescriptor,
shuffleDescriptor,
maxParallelism,
- lazyScheduling));
+ lazyScheduling,
+ releasePartitionOnConsumption
+ ?
ShuffleDescriptor.ReleaseType.AUTO
+ :
ShuffleDescriptor.ReleaseType.MANUAL));
partitionRegistrations.add(partitionRegistration);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4050622..0c20ee5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -305,6 +305,8 @@ public class ExecutionGraph implements AccessExecutionGraph
{
/** Shuffle master to register partitions for task deployment. */
private final ShuffleMaster<?> shuffleMaster;
+ private boolean forcePartitionReleaseOnConsumption;
+
//
--------------------------------------------------------------------------------------------
// Constructors
//
--------------------------------------------------------------------------------------------
@@ -406,7 +408,8 @@ public class ExecutionGraph implements AccessExecutionGraph
{
userClassLoader,
blobWriter,
allocationTimeout,
- NettyShuffleMaster.INSTANCE);
+ NettyShuffleMaster.INSTANCE,
+ true);
}
public ExecutionGraph(
@@ -421,7 +424,8 @@ public class ExecutionGraph implements AccessExecutionGraph
{
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout,
- ShuffleMaster<?> shuffleMaster) throws IOException {
+ ShuffleMaster<?> shuffleMaster,
+ boolean forcePartitionReleaseOnConsumption) throws
IOException {
checkNotNull(futureExecutor);
@@ -471,6 +475,8 @@ public class ExecutionGraph implements AccessExecutionGraph
{
this.shuffleMaster = checkNotNull(shuffleMaster);
+ this.forcePartitionReleaseOnConsumption =
forcePartitionReleaseOnConsumption;
+
LOG.info("Job recovers via failover strategy: {}",
failoverStrategy.getStrategyName());
}
@@ -697,6 +703,10 @@ public class ExecutionGraph implements
AccessExecutionGraph {
return globalModVersion - 1;
}
+ boolean isForcePartitionReleaseOnConsumption() {
+ return forcePartitionReleaseOnConsumption;
+ }
+
@Override
public ExecutionJobVertex getJobVertex(JobVertexID id) {
return this.tasks.get(id);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index ea7e124..c4eead1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -115,6 +115,9 @@ public class ExecutionGraphBuilder {
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
+ final boolean forcePartitionReleaseOnConsumption =
+
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
+
// create a new execution graph, if none exists so far
final ExecutionGraph executionGraph;
try {
@@ -131,7 +134,8 @@ public class ExecutionGraphBuilder {
classLoader,
blobWriter,
allocationTimeout,
- shuffleMaster);
+ shuffleMaster,
+ forcePartitionReleaseOnConsumption);
} catch (IOException e) {
throw new JobException("Could not create the
ExecutionGraph.", e);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 15bdfbc..74dbe0f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -99,8 +99,7 @@ public class NettyShuffleServiceFactory implements
ShuffleServiceFactory<NettySh
ioManager,
networkBufferPool,
config.networkBuffersPerChannel(),
- config.floatingNetworkBuffersPerGate(),
- config.isForcePartitionReleaseOnConsumption());
+ config.floatingNetworkBuffersPerGate());
SingleInputGateFactory singleInputGateFactory = new
SingleInputGateFactory(
taskExecutorResourceId,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index d15edaa..9479653 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -57,22 +57,18 @@ public class ResultPartitionFactory {
private final int floatingNetworkBuffersPerGate;
- private final boolean forcePartitionReleaseOnConsumption;
-
public ResultPartitionFactory(
@Nonnull ResultPartitionManager partitionManager,
@Nonnull IOManager ioManager,
@Nonnull BufferPoolFactory bufferPoolFactory,
int networkBuffersPerChannel,
- int floatingNetworkBuffersPerGate,
- boolean forcePartitionReleaseOnConsumption) {
+ int floatingNetworkBuffersPerGate) {
this.partitionManager = partitionManager;
this.ioManager = ioManager;
this.networkBuffersPerChannel = networkBuffersPerChannel;
this.floatingNetworkBuffersPerGate =
floatingNetworkBuffersPerGate;
this.bufferPoolFactory = bufferPoolFactory;
- this.forcePartitionReleaseOnConsumption =
forcePartitionReleaseOnConsumption;
}
public ResultPartition create(
@@ -86,7 +82,7 @@ public class ResultPartitionFactory {
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
- desc.isReleasedOnConsumption() ||
forcePartitionReleaseOnConsumption,
+ desc.isReleasedOnConsumption(),
createBufferPoolFactory(desc.getNumberOfSubpartitions(),
desc.getPartitionType()));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index b439f08..a6ff17b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -60,8 +60,6 @@ public class NettyShuffleEnvironmentConfiguration {
private final boolean isNetworkDetailedMetrics;
- private final boolean forcePartitionReleaseOnConsumption;
-
private final NettyConfig nettyConfig;
public NettyShuffleEnvironmentConfiguration(
@@ -73,7 +71,6 @@ public class NettyShuffleEnvironmentConfiguration {
int floatingNetworkBuffersPerGate,
boolean isCreditBased,
boolean isNetworkDetailedMetrics,
- boolean forcePartitionReleaseOnConsumption,
@Nullable NettyConfig nettyConfig) {
this.numNetworkBuffers = numNetworkBuffers;
@@ -84,7 +81,6 @@ public class NettyShuffleEnvironmentConfiguration {
this.floatingNetworkBuffersPerGate =
floatingNetworkBuffersPerGate;
this.isCreditBased = isCreditBased;
this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
- this.forcePartitionReleaseOnConsumption =
forcePartitionReleaseOnConsumption;
this.nettyConfig = nettyConfig;
}
@@ -126,10 +122,6 @@ public class NettyShuffleEnvironmentConfiguration {
return isNetworkDetailedMetrics;
}
- public boolean isForcePartitionReleaseOnConsumption() {
- return forcePartitionReleaseOnConsumption;
- }
-
//
------------------------------------------------------------------------
/**
@@ -166,9 +158,6 @@ public class NettyShuffleEnvironmentConfiguration {
boolean isNetworkDetailedMetrics =
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
- boolean forcePartitionReleaseOnConsumption =
configuration.getBoolean(
-
NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);
-
return new NettyShuffleEnvironmentConfiguration(
numberOfNetworkBuffers,
pageSize,
@@ -178,7 +167,6 @@ public class NettyShuffleEnvironmentConfiguration {
extraBuffersPerGate,
isCreditBased,
isNetworkDetailedMetrics,
- forcePartitionReleaseOnConsumption,
nettyConfig);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1439f00..49cd478 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -104,24 +104,6 @@ public class ResultPartitionDeploymentDescriptorTest
extends TestLogger {
assertThat(shuffleDescriptorCopy.getConnectionId(),
is(connectionID));
}
- @Test
- public void testReleasedOnConsumptionFlag() {
- for (ResultPartitionType partitionType :
ResultPartitionType.values()) {
- ResultPartitionDeploymentDescriptor partitionDescriptor
= new ResultPartitionDeploymentDescriptor(
- new PartitionDescriptor(resultId, partitionId,
partitionType, numberOfSubpartitions, connectionIndex),
-
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
- 1,
- true
- );
-
- if (partitionType == ResultPartitionType.BLOCKING) {
-
assertThat(partitionDescriptor.isReleasedOnConsumption(), is(false));
- } else {
-
assertThat(partitionDescriptor.isReleasedOnConsumption(), is(true));
- }
- }
- }
-
@Test(expected = IllegalArgumentException.class)
public void testIncompatibleReleaseTypeManual() {
new ResultPartitionDeploymentDescriptor(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 9f0a9ce..b0ef430 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -128,7 +128,6 @@ public class NettyShuffleEnvironmentBuilder {
floatingNetworkBuffersPerGate,
isCreditBased,
isNetworkDetailedMetrics,
- true,
nettyConfig),
taskManagerLocation,
taskEventDispatcher,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 870a745..007fdc3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -84,4 +84,21 @@ public class PartitionTestUtils {
1,
true);
}
+
+ public static ResultPartitionDeploymentDescriptor
createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType releaseType) {
+ // set partition to blocking to support all release types
+ ShuffleDescriptor shuffleDescriptor =
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(true).buildLocal();
+ PartitionDescriptor partitionDescriptor = new
PartitionDescriptor(
+ new IntermediateDataSetID(),
+
shuffleDescriptor.getResultPartitionID().getPartitionId(),
+ ResultPartitionType.BLOCKING,
+ 1,
+ 0);
+ return new ResultPartitionDeploymentDescriptor(
+ partitionDescriptor,
+ shuffleDescriptor,
+ 1,
+ true,
+ releaseType);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index f1ba7fb..bf403ac 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -125,8 +125,7 @@ public class ResultPartitionBuilder {
ioManager,
networkBufferPool,
networkBuffersPerChannel,
- floatingNetworkBuffersPerGate,
- true);
+ floatingNetworkBuffersPerGate);
FunctionWithException<BufferPoolOwner, BufferPool, IOException>
factory = bufferPoolFactory.orElseGet(() ->
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions,
partitionType));
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 1de51fa..8e95f8a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.TestLogger;
@@ -39,23 +40,24 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class ResultPartitionFactoryTest extends TestLogger {
@Test
- public void testForceConsumptionOnReleaseEnabled() {
- testForceConsumptionOnRelease(true);
+ public void testConsumptionOnReleaseEnabled() {
+ final ResultPartition resultPartition =
createResultPartition(ShuffleDescriptor.ReleaseType.AUTO);
+ assertThat(resultPartition,
instanceOf(ReleaseOnConsumptionResultPartition.class));
}
@Test
- public void testForceConsumptionOnReleaseDisabled() {
- testForceConsumptionOnRelease(false);
+ public void testConsumptionOnReleaseDisabled() {
+ final ResultPartition resultPartition =
createResultPartition(ShuffleDescriptor.ReleaseType.MANUAL);
+ assertThat(resultPartition,
not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
}
- private static void testForceConsumptionOnRelease(boolean
forceConsumptionOnRelease) {
+ private static ResultPartition
createResultPartition(ShuffleDescriptor.ReleaseType releaseType) {
ResultPartitionFactory factory = new ResultPartitionFactory(
new ResultPartitionManager(),
new NoOpIOManager(),
new NetworkBufferPool(1, 64, 1),
1,
- 1,
- forceConsumptionOnRelease
+ 1
);
ResultPartitionType partitionType =
ResultPartitionType.BLOCKING;
@@ -68,15 +70,10 @@ public class ResultPartitionFactoryTest extends TestLogger {
0),
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
1,
- true
+ true,
+ releaseType
);
- final ResultPartition test = factory.create("test", new
ExecutionAttemptID(), descriptor);
-
- if (forceConsumptionOnRelease) {
- assertThat(test,
instanceOf(ReleaseOnConsumptionResultPartition.class));
- } else {
- assertThat(test,
not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
- }
+ return factory.create("test", new ExecutionAttemptID(),
descriptor);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9ceacad..a616c71 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -44,7 +44,6 @@ import
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -57,6 +56,7 @@ import
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -216,7 +216,7 @@ public class TaskExecutorPartitionLifecycleTest extends
TestLogger {
boolean waitForRelease) throws Exception {
final ResultPartitionDeploymentDescriptor
taskResultPartitionDescriptor =
-
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
+
PartitionTestUtils.createPartitionDeploymentDescriptor(ShuffleDescriptor.ReleaseType.MANUAL);
final ExecutionAttemptID eid1 =
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
final TaskDeploymentDescriptor taskDeploymentDescriptor =