This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1ce8047ef2c9d04fb460448cd4dfc35c29602ae
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Tue Jun 25 11:29:45 2019 +0200

    [FLINK-12960][coordination][shuffle] Introduce 
ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes
    
    `ResultPartitionDeploymentDescriptor#releasedOnConsumption` shows the 
intention how the partition is going to be used by the shuffle user and 
released. The `ShuffleDescriptor` should provide a way to query which release 
type is supported by shuffle service for this partition. If the requested 
release type is not supported by the shuffle service for a certain type of 
partition, the job should fail fast.
---
 .../ResultPartitionDeploymentDescriptor.java       | 44 ++++++++++++++++++++--
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 19 +++++++++-
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  3 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 29 ++++++++++++++
 .../flink/runtime/shuffle/ShuffleEnvironment.java  |  8 +++-
 .../flink/runtime/shuffle/ShuffleMaster.java       |  2 +
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 +++
 .../ResultPartitionDeploymentDescriptorTest.java   | 16 +++++++-
 .../io/network/partition/PartitionTestUtils.java   |  2 +-
 .../partition/ResultPartitionFactoryTest.java      |  5 ++-
 .../util/NettyShuffleDescriptorBuilder.java        | 12 +++++-
 11 files changed, 131 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index d627de3..164e960 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -31,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import java.io.Serializable;
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -51,20 +53,49 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
        /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
        private final boolean sendScheduleOrUpdateConsumersMessage;
 
-       /** Whether the result partition is released on consumption. */
-       private final boolean releasedOnConsumption;
+       private final ReleaseType releaseType;
 
        public ResultPartitionDeploymentDescriptor(
                        PartitionDescriptor partitionDescriptor,
                        ShuffleDescriptor shuffleDescriptor,
                        int maxParallelism,
                        boolean sendScheduleOrUpdateConsumersMessage) {
+               this(
+                       checkNotNull(partitionDescriptor),
+                       shuffleDescriptor,
+                       maxParallelism,
+                       sendScheduleOrUpdateConsumersMessage,
+                       // Later we might have to make the scheduling adjust 
automatically
+                       // if certain release type is not supported by shuffle 
service implementation at hand
+                       partitionDescriptor.getPartitionType() == 
ResultPartitionType.BLOCKING ? ReleaseType.MANUAL : ReleaseType.AUTO);
+       }
+
+       public ResultPartitionDeploymentDescriptor(
+                       PartitionDescriptor partitionDescriptor,
+                       ShuffleDescriptor shuffleDescriptor,
+                       int maxParallelism,
+                       boolean sendScheduleOrUpdateConsumersMessage,
+                       ReleaseType releaseType) {
+               
checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, 
releaseType);
                this.partitionDescriptor = checkNotNull(partitionDescriptor);
                this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
                
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
                this.maxParallelism = maxParallelism;
                this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
-               this.releasedOnConsumption = 
partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING;
+               this.releaseType = releaseType;
+       }
+
+       private static void checkReleaseOnConsumptionIsSupportedForPartition(
+                       ShuffleDescriptor shuffleDescriptor,
+                       ReleaseType releaseType) {
+               checkNotNull(shuffleDescriptor);
+               checkArgument(
+                       
shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType),
+                       "Release type %s is not supported by the shuffle 
service for this partition" +
+                               "(id: %s), supported release types: %s",
+                       releaseType,
+                       shuffleDescriptor.getResultPartitionID(),
+                       shuffleDescriptor.getSupportedReleaseTypes());
        }
 
        public IntermediateDataSetID getResultId() {
@@ -103,10 +134,15 @@ public class ResultPartitionDeploymentDescriptor 
implements Serializable {
         * by {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
         * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
         *
+        * <p>The partition has to support the corresponding {@link 
ReleaseType} in
+        * {@link ShuffleDescriptor#getSupportedReleaseTypes()}:
+        * {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to 
return {@code true} and
+        * {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to 
return {@code false}.
+        *
         * @return whether to release the partition after having been fully 
consumed once.
         */
        public boolean isReleasedOnConsumption() {
-               return releasedOnConsumption;
+               return releaseType == ReleaseType.AUTO;
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index f758bcc..8086b27 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -34,19 +35,29 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
 
        private static final long serialVersionUID = 852181945034989215L;
 
+       private static final EnumSet<ReleaseType> 
SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS =
+               EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL);
+
+       private static final EnumSet<ReleaseType> 
SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS =
+               EnumSet.of(ReleaseType.AUTO);
+
        private final ResourceID producerLocation;
 
        private final PartitionConnectionInfo partitionConnectionInfo;
 
        private final ResultPartitionID resultPartitionID;
 
+       private final boolean isBlocking;
+
        public NettyShuffleDescriptor(
                        ResourceID producerLocation,
                        PartitionConnectionInfo partitionConnectionInfo,
-                       ResultPartitionID resultPartitionID) {
+                       ResultPartitionID resultPartitionID,
+                       boolean isBlocking) {
                this.producerLocation = producerLocation;
                this.partitionConnectionInfo = partitionConnectionInfo;
                this.resultPartitionID = resultPartitionID;
+               this.isBlocking = isBlocking;
        }
 
        public ConnectionID getConnectionId() {
@@ -63,6 +74,12 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
                return Optional.of(producerLocation);
        }
 
+       @Override
+       public EnumSet<ReleaseType> getSupportedReleaseTypes() {
+               return isBlocking ?
+                       SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : 
SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS;
+       }
+
        public boolean isLocalTo(ResourceID consumerLocation) {
                return producerLocation.equals(consumerLocation);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 6c2cb32..c369ff1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -42,7 +42,8 @@ public enum NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
                NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
                        producerDescriptor.getProducerLocation(),
                        createConnectionInfo(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
-                       resultPartitionID);
+                       resultPartitionID,
+                       partitionDescriptor.getPartitionType().isBlocking());
 
                return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 5e29472..5af56f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -67,4 +68,32 @@ public interface ShuffleDescriptor extends Serializable {
         * @return the resource id of the producing task executor if the 
partition occupies local resources there
         */
        Optional<ResourceID> storesLocalResourcesOn();
+
+       /**
+        * Return release types supported by Shuffle Service for this partition.
+        */
+       EnumSet<ReleaseType> getSupportedReleaseTypes();
+
+       /**
+        * Partition release type.
+        */
+       enum ReleaseType {
+               /**
+                * Auto-release the partition after having been fully consumed 
once.
+                *
+                * <p>No additional actions required, like {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)}
+                * or {@link 
ShuffleEnvironment#releasePartitionsLocally(Collection)}
+                */
+               AUTO,
+
+               /**
+                * Manually release the partition, the partition has to support 
consumption multiple times.
+                *
+                * <p>The partition requires manual release once all 
consumption is done:
+                * {@link 
ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and
+                * if the partition occupies producer local resources ({@link 
#storesLocalResourcesOn()}) then also
+                * {@link 
ShuffleEnvironment#releasePartitionsLocally(Collection)}.
+                */
+               MANUAL
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
index a206f15..ed66f2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -63,12 +64,14 @@ import java.util.Collection;
  *     <li>if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} 
and
  *     {@link ResultPartitionWriter#finish()} and {@link 
ResultPartitionWriter#close()} are called when the production is done.
  *     The actual release can take some time depending on implementation 
details,
- *     e.g. if the `end of consumption' confirmation from the consumer is 
being awaited implicitly.</li>
+ *     e.g. if the `end of consumption' confirmation from the consumer is 
being awaited implicitly.
+ *     The partition has to support the {@link ReleaseType#AUTO} in {@link 
ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
  *     <li>if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} 
and
  *     {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and 
{@link ShuffleEnvironment#releasePartitionsLocally(Collection)},
  *     if it occupies any producer local resources ({@link 
ShuffleDescriptor#storesLocalResourcesOn()}),
  *     are called outside of the producer thread, e.g. to manage the lifecycle 
of BLOCKING result partitions
- *     which can outlive their producers.</li>
+ *     which can outlive their producers. The partition has to support the 
{@link ReleaseType#MANUAL} in
+ *     {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li>
  * </ol>
  * The partitions, which currently still occupy local resources, can be 
queried with
  * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}.
@@ -130,6 +133,7 @@ public interface ShuffleEnvironment<P extends 
ResultPartitionWriter, G extends I
         * <p>This is called for partitions which occupy resources locally
         * (can be checked by {@link 
ShuffleDescriptor#storesLocalResourcesOn()}).
         * This method is not called if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
+        * The partition has to support the {@link ReleaseType#MANUAL} in 
{@link ShuffleDescriptor#getSupportedReleaseTypes()}.
         *
         * @param partitionIds identifying the partitions to be released
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
index 50bda70..a9ef1c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.shuffle;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -51,6 +52,7 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> {
         * <p>This call triggers release of any resources which are occupied by 
the given partition in the external systems
         * outside of the producer executor. This is mostly relevant for the 
batch jobs and blocking result partitions.
         * This method is not called if {@link 
ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}.
+        * The partition has to support the {@link ReleaseType#MANUAL} in 
{@link ShuffleDescriptor#getSupportedReleaseTypes()}.
         * The producer local resources are managed by {@link 
ShuffleDescriptor#storesLocalResourcesOn()} and
         * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
index 339f343..7c35516 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
+import java.util.EnumSet;
 import java.util.Optional;
 
 /**
@@ -56,4 +57,9 @@ public final class UnknownShuffleDescriptor implements 
ShuffleDescriptor {
        public Optional<ResourceID> storesLocalResourcesOn() {
                return Optional.empty();
        }
+
+       @Override
+       public EnumSet<ReleaseType> getSupportedReleaseTypes() {
+               return EnumSet.noneOf(ReleaseType.class);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 90301fc..1439f00 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.apache.flink.util.TestLogger;
@@ -89,7 +90,8 @@ public class ResultPartitionDeploymentDescriptorTest extends 
TestLogger {
                ShuffleDescriptor shuffleDescriptor = new 
NettyShuffleDescriptor(
                        producerLocation,
                        new NetworkPartitionConnectionInfo(connectionID),
-                       resultPartitionID);
+                       resultPartitionID,
+                       false);
 
                ResultPartitionDeploymentDescriptor copy =
                        
createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
@@ -107,7 +109,7 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
                for (ResultPartitionType partitionType : 
ResultPartitionType.values()) {
                        ResultPartitionDeploymentDescriptor partitionDescriptor 
= new ResultPartitionDeploymentDescriptor(
                                new PartitionDescriptor(resultId, partitionId, 
partitionType, numberOfSubpartitions, connectionIndex),
-                               
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+                               
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
                                1,
                                true
                        );
@@ -120,6 +122,16 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
                }
        }
 
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompatibleReleaseTypeManual() {
+               new ResultPartitionDeploymentDescriptor(
+                       partitionDescriptor,
+                       
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(),
+                       1,
+                       true,
+                       ReleaseType.MANUAL);
+       }
+
        private static ResultPartitionDeploymentDescriptor 
createCopyAndVerifyResultPartitionDeploymentDescriptor(
                        ShuffleDescriptor shuffleDescriptor) throws IOException 
{
                ResultPartitionDeploymentDescriptor orig = new 
ResultPartitionDeploymentDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 9fb83a7..870a745 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -71,7 +71,7 @@ public class PartitionTestUtils {
        }
 
        public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(ResultPartitionType partitionType) {
-               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
+               ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal();
                PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
                        new IntermediateDataSetID(),
                        
shuffleDescriptor.getResultPartitionID().getPartitionId(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index a409a9c..1de51fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -58,14 +58,15 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        forceConsumptionOnRelease
                );
 
+               ResultPartitionType partitionType = 
ResultPartitionType.BLOCKING;
                final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(
                        new PartitionDescriptor(
                                new IntermediateDataSetID(),
                                new IntermediateResultPartitionID(),
-                               ResultPartitionType.BLOCKING,
+                               partitionType,
                                1,
                                0),
-                       NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+                       
NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(),
                        1,
                        true
                );
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 0dcccaf..0ecd81a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -40,6 +40,7 @@ public class NettyShuffleDescriptorBuilder {
        private InetAddress address = InetAddress.getLoopbackAddress();
        private int dataPort = 0;
        private int connectionIndex = 0;
+       private boolean isBlocking;
 
        public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID 
producerLocation) {
                this.producerLocation = producerLocation;
@@ -73,19 +74,26 @@ public class NettyShuffleDescriptorBuilder {
                return this;
        }
 
+       public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) {
+               this.isBlocking = isBlocking;
+               return this;
+       }
+
        public NettyShuffleDescriptor buildRemote() {
                ConnectionID connectionID = new ConnectionID(new 
InetSocketAddress(address, dataPort), connectionIndex);
                return new NettyShuffleDescriptor(
                        producerLocation,
                        new NetworkPartitionConnectionInfo(connectionID),
-                       id);
+                       id,
+                       isBlocking);
        }
 
        public NettyShuffleDescriptor buildLocal() {
                return new NettyShuffleDescriptor(
                        producerLocation,
                        LocalExecutionPartitionConnectionInfo.INSTANCE,
-                       id);
+                       id,
+                       isBlocking);
        }
 
        public static NettyShuffleDescriptorBuilder newBuilder() {

Reply via email to