This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d1ce8047ef2c9d04fb460448cd4dfc35c29602ae Author: Andrey Zagrebin <[email protected]> AuthorDate: Tue Jun 25 11:29:45 2019 +0200 [FLINK-12960][coordination][shuffle] Introduce ShuffleDescriptor#ReleaseType and ShuffleDescriptor#getSupportedReleaseTypes `ResultPartitionDeploymentDescriptor#releasedOnConsumption` shows the intention how the partition is going to be used by the shuffle user and released. The `ShuffleDescriptor` should provide a way to query which release type is supported by shuffle service for this partition. If the requested release type is not supported by the shuffle service for a certain type of partition, the job should fail fast. --- .../ResultPartitionDeploymentDescriptor.java | 44 ++++++++++++++++++++-- .../runtime/shuffle/NettyShuffleDescriptor.java | 19 +++++++++- .../flink/runtime/shuffle/NettyShuffleMaster.java | 3 +- .../flink/runtime/shuffle/ShuffleDescriptor.java | 29 ++++++++++++++ .../flink/runtime/shuffle/ShuffleEnvironment.java | 8 +++- .../flink/runtime/shuffle/ShuffleMaster.java | 2 + .../runtime/shuffle/UnknownShuffleDescriptor.java | 6 +++ .../ResultPartitionDeploymentDescriptorTest.java | 16 +++++++- .../io/network/partition/PartitionTestUtils.java | 2 +- .../partition/ResultPartitionFactoryTest.java | 5 ++- .../util/NettyShuffleDescriptorBuilder.java | 12 +++++- 11 files changed, 131 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java index d627de3..164e960 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -31,6 +32,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import java.io.Serializable; import java.util.Collection; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -51,20 +53,49 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { /** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */ private final boolean sendScheduleOrUpdateConsumersMessage; - /** Whether the result partition is released on consumption. */ - private final boolean releasedOnConsumption; + private final ReleaseType releaseType; public ResultPartitionDeploymentDescriptor( PartitionDescriptor partitionDescriptor, ShuffleDescriptor shuffleDescriptor, int maxParallelism, boolean sendScheduleOrUpdateConsumersMessage) { + this( + checkNotNull(partitionDescriptor), + shuffleDescriptor, + maxParallelism, + sendScheduleOrUpdateConsumersMessage, + // Later we might have to make the scheduling adjust automatically + // if certain release type is not supported by shuffle service implementation at hand + partitionDescriptor.getPartitionType() == ResultPartitionType.BLOCKING ? ReleaseType.MANUAL : ReleaseType.AUTO); + } + + public ResultPartitionDeploymentDescriptor( + PartitionDescriptor partitionDescriptor, + ShuffleDescriptor shuffleDescriptor, + int maxParallelism, + boolean sendScheduleOrUpdateConsumersMessage, + ReleaseType releaseType) { + checkReleaseOnConsumptionIsSupportedForPartition(shuffleDescriptor, releaseType); this.partitionDescriptor = checkNotNull(partitionDescriptor); this.shuffleDescriptor = checkNotNull(shuffleDescriptor); KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism); this.maxParallelism = maxParallelism; this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage; - this.releasedOnConsumption = partitionDescriptor.getPartitionType() != ResultPartitionType.BLOCKING; + this.releaseType = releaseType; + } + + private static void checkReleaseOnConsumptionIsSupportedForPartition( + ShuffleDescriptor shuffleDescriptor, + ReleaseType releaseType) { + checkNotNull(shuffleDescriptor); + checkArgument( + shuffleDescriptor.getSupportedReleaseTypes().contains(releaseType), + "Release type %s is not supported by the shuffle service for this partition" + + "(id: %s), supported release types: %s", + releaseType, + shuffleDescriptor.getResultPartitionID(), + shuffleDescriptor.getSupportedReleaseTypes()); } public IntermediateDataSetID getResultId() { @@ -103,10 +134,15 @@ public class ResultPartitionDeploymentDescriptor implements Serializable { * by {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} * and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}. * + * <p>The partition has to support the corresponding {@link ReleaseType} in + * {@link ShuffleDescriptor#getSupportedReleaseTypes()}: + * {@link ReleaseType#AUTO} for {@code isReleasedOnConsumption()} to return {@code true} and + * {@link ReleaseType#MANUAL} for {@code isReleasedOnConsumption()} to return {@code false}. + * * @return whether to release the partition after having been fully consumed once. */ public boolean isReleasedOnConsumption() { - return releasedOnConsumption; + return releaseType == ReleaseType.AUTO; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java index f758bcc..8086b27 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import java.io.Serializable; import java.net.InetSocketAddress; +import java.util.EnumSet; import java.util.Optional; /** @@ -34,19 +35,29 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { private static final long serialVersionUID = 852181945034989215L; + private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS = + EnumSet.of(ReleaseType.AUTO, ReleaseType.MANUAL); + + private static final EnumSet<ReleaseType> SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS = + EnumSet.of(ReleaseType.AUTO); + private final ResourceID producerLocation; private final PartitionConnectionInfo partitionConnectionInfo; private final ResultPartitionID resultPartitionID; + private final boolean isBlocking; + public NettyShuffleDescriptor( ResourceID producerLocation, PartitionConnectionInfo partitionConnectionInfo, - ResultPartitionID resultPartitionID) { + ResultPartitionID resultPartitionID, + boolean isBlocking) { this.producerLocation = producerLocation; this.partitionConnectionInfo = partitionConnectionInfo; this.resultPartitionID = resultPartitionID; + this.isBlocking = isBlocking; } public ConnectionID getConnectionId() { @@ -63,6 +74,12 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor { return Optional.of(producerLocation); } + @Override + public EnumSet<ReleaseType> getSupportedReleaseTypes() { + return isBlocking ? + SUPPORTED_RELEASE_TYPES_FOR_BLOCKING_PARTITIONS : SUPPORTED_RELEASE_TYPES_FOR_NON_BLOCKING_PARTITIONS; + } + public boolean isLocalTo(ResourceID consumerLocation) { return producerLocation.equals(consumerLocation); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index 6c2cb32..c369ff1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -42,7 +42,8 @@ public enum NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( producerDescriptor.getProducerLocation(), createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), - resultPartitionID); + resultPartitionID, + partitionDescriptor.getPartitionType().isBlocking()); return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java index 5e29472..5af56f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import java.io.Serializable; import java.util.Collection; +import java.util.EnumSet; import java.util.Optional; /** @@ -67,4 +68,32 @@ public interface ShuffleDescriptor extends Serializable { * @return the resource id of the producing task executor if the partition occupies local resources there */ Optional<ResourceID> storesLocalResourcesOn(); + + /** + * Return release types supported by Shuffle Service for this partition. + */ + EnumSet<ReleaseType> getSupportedReleaseTypes(); + + /** + * Partition release type. + */ + enum ReleaseType { + /** + * Auto-release the partition after having been fully consumed once. + * + * <p>No additional actions required, like {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} + * or {@link ShuffleEnvironment#releasePartitionsLocally(Collection)} + */ + AUTO, + + /** + * Manually release the partition, the partition has to support consumption multiple times. + * + * <p>The partition requires manual release once all consumption is done: + * {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and + * if the partition occupies producer local resources ({@link #storesLocalResourcesOn()}) then also + * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}. + */ + MANUAL + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java index a206f15..ed66f2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleEnvironment.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType; import java.io.IOException; import java.util.Collection; @@ -63,12 +64,14 @@ import java.util.Collection; * <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true} and * {@link ResultPartitionWriter#finish()} and {@link ResultPartitionWriter#close()} are called when the production is done. * The actual release can take some time depending on implementation details, - * e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly.</li> + * e.g. if the `end of consumption' confirmation from the consumer is being awaited implicitly. + * The partition has to support the {@link ReleaseType#AUTO} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li> * <li>if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code false} and * {@link ShuffleMaster#releasePartitionExternally(ShuffleDescriptor)} and {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}, * if it occupies any producer local resources ({@link ShuffleDescriptor#storesLocalResourcesOn()}), * are called outside of the producer thread, e.g. to manage the lifecycle of BLOCKING result partitions - * which can outlive their producers.</li> + * which can outlive their producers. The partition has to support the {@link ReleaseType#MANUAL} in + * {@link ShuffleDescriptor#getSupportedReleaseTypes()}.</li> * </ol> * The partitions, which currently still occupy local resources, can be queried with * {@link ShuffleEnvironment#getPartitionsOccupyingLocalResources}. @@ -130,6 +133,7 @@ public interface ShuffleEnvironment<P extends ResultPartitionWriter, G extends I * <p>This is called for partitions which occupy resources locally * (can be checked by {@link ShuffleDescriptor#storesLocalResourcesOn()}). * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}. + * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}. * * @param partitionIds identifying the partitions to be released */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java index 50bda70..a9ef1c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.shuffle; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -51,6 +52,7 @@ public interface ShuffleMaster<T extends ShuffleDescriptor> { * <p>This call triggers release of any resources which are occupied by the given partition in the external systems * outside of the producer executor. This is mostly relevant for the batch jobs and blocking result partitions. * This method is not called if {@link ResultPartitionDeploymentDescriptor#isReleasedOnConsumption()} is {@code true}. + * The partition has to support the {@link ReleaseType#MANUAL} in {@link ShuffleDescriptor#getSupportedReleaseTypes()}. * The producer local resources are managed by {@link ShuffleDescriptor#storesLocalResourcesOn()} and * {@link ShuffleEnvironment#releasePartitionsLocally(Collection)}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java index 339f343..7c35516 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/UnknownShuffleDescriptor.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.shuffle; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import java.util.EnumSet; import java.util.Optional; /** @@ -56,4 +57,9 @@ public final class UnknownShuffleDescriptor implements ShuffleDescriptor { public Optional<ResourceID> storesLocalResourcesOn() { return Optional.empty(); } + + @Override + public EnumSet<ReleaseType> getSupportedReleaseTypes() { + return EnumSet.noneOf(ReleaseType.class); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java index 90301fc..1439f00 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor.ReleaseType; import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.apache.flink.util.TestLogger; @@ -89,7 +90,8 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { ShuffleDescriptor shuffleDescriptor = new NettyShuffleDescriptor( producerLocation, new NetworkPartitionConnectionInfo(connectionID), - resultPartitionID); + resultPartitionID, + false); ResultPartitionDeploymentDescriptor copy = createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor); @@ -107,7 +109,7 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { for (ResultPartitionType partitionType : ResultPartitionType.values()) { ResultPartitionDeploymentDescriptor partitionDescriptor = new ResultPartitionDeploymentDescriptor( new PartitionDescriptor(resultId, partitionId, partitionType, numberOfSubpartitions, connectionIndex), - NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), + NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(), 1, true ); @@ -120,6 +122,16 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger { } } + @Test(expected = IllegalArgumentException.class) + public void testIncompatibleReleaseTypeManual() { + new ResultPartitionDeploymentDescriptor( + partitionDescriptor, + NettyShuffleDescriptorBuilder.newBuilder().setBlocking(false).buildLocal(), + 1, + true, + ReleaseType.MANUAL); + } + private static ResultPartitionDeploymentDescriptor createCopyAndVerifyResultPartitionDeploymentDescriptor( ShuffleDescriptor shuffleDescriptor) throws IOException { ResultPartitionDeploymentDescriptor orig = new ResultPartitionDeploymentDescriptor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index 9fb83a7..870a745 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -71,7 +71,7 @@ public class PartitionTestUtils { } public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor(ResultPartitionType partitionType) { - ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal(); + ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(); PartitionDescriptor partitionDescriptor = new PartitionDescriptor( new IntermediateDataSetID(), shuffleDescriptor.getResultPartitionID().getPartitionId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java index a409a9c..1de51fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java @@ -58,14 +58,15 @@ public class ResultPartitionFactoryTest extends TestLogger { forceConsumptionOnRelease ); + ResultPartitionType partitionType = ResultPartitionType.BLOCKING; final ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor( new PartitionDescriptor( new IntermediateDataSetID(), new IntermediateResultPartitionID(), - ResultPartitionType.BLOCKING, + partitionType, 1, 0), - NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), + NettyShuffleDescriptorBuilder.newBuilder().setBlocking(partitionType.isBlocking()).buildLocal(), 1, true ); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java index 0dcccaf..0ecd81a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java @@ -40,6 +40,7 @@ public class NettyShuffleDescriptorBuilder { private InetAddress address = InetAddress.getLoopbackAddress(); private int dataPort = 0; private int connectionIndex = 0; + private boolean isBlocking; public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID producerLocation) { this.producerLocation = producerLocation; @@ -73,19 +74,26 @@ public class NettyShuffleDescriptorBuilder { return this; } + public NettyShuffleDescriptorBuilder setBlocking(boolean isBlocking) { + this.isBlocking = isBlocking; + return this; + } + public NettyShuffleDescriptor buildRemote() { ConnectionID connectionID = new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex); return new NettyShuffleDescriptor( producerLocation, new NetworkPartitionConnectionInfo(connectionID), - id); + id, + isBlocking); } public NettyShuffleDescriptor buildLocal() { return new NettyShuffleDescriptor( producerLocation, LocalExecutionPartitionConnectionInfo.INSTANCE, - id); + id, + isBlocking); } public static NettyShuffleDescriptorBuilder newBuilder() {
