This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac0e20499d62467e279a3ce6bc305a3f879fbab4 Author: Andrey Zagrebin <[email protected]> AuthorDate: Mon Jul 29 17:27:50 2019 +0300 [hotfix][network] fix codestyle issues in ResultPartitionFactory --- .../network/partition/ResultPartitionFactory.java | 49 ++++++++++------------ 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index b390987..0656e6e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import java.io.File; import java.io.IOException; import java.util.Optional; @@ -46,13 +44,10 @@ public class ResultPartitionFactory { private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class); - @Nonnull private final ResultPartitionManager partitionManager; - @Nonnull private final FileChannelManager channelManager; - @Nonnull private final BufferPoolFactory bufferPoolFactory; private final BoundedBlockingSubpartitionType blockingSubpartitionType; @@ -66,9 +61,9 @@ public class ResultPartitionFactory { private final boolean forcePartitionReleaseOnConsumption; public ResultPartitionFactory( - @Nonnull ResultPartitionManager partitionManager, - @Nonnull FileChannelManager channelManager, - @Nonnull BufferPoolFactory bufferPoolFactory, + ResultPartitionManager partitionManager, + FileChannelManager channelManager, + BufferPoolFactory bufferPoolFactory, BoundedBlockingSubpartitionType blockingSubpartitionType, int networkBuffersPerChannel, int floatingNetworkBuffersPerGate, @@ -86,9 +81,8 @@ public class ResultPartitionFactory { } public ResultPartition create( - @Nonnull String taskNameWithSubtaskAndId, - @Nonnull ResultPartitionDeploymentDescriptor desc) { - + String taskNameWithSubtaskAndId, + ResultPartitionDeploymentDescriptor desc) { return create( taskNameWithSubtaskAndId, desc.getShuffleDescriptor().getResultPartitionID(), @@ -100,13 +94,12 @@ public class ResultPartitionFactory { @VisibleForTesting public ResultPartition create( - @Nonnull String taskNameWithSubtaskAndId, - @Nonnull ResultPartitionID id, - @Nonnull ResultPartitionType type, - int numberOfSubpartitions, - int maxParallelism, - FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) { - + String taskNameWithSubtaskAndId, + ResultPartitionID id, + ResultPartitionType type, + int numberOfSubpartitions, + int maxParallelism, + FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) { ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions]; ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking() @@ -139,10 +132,10 @@ public class ResultPartitionFactory { ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) { - // Create the subpartitions. switch (type) { case BLOCKING: + case BLOCKING_PERSISTENT: initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager); break; @@ -160,15 +153,14 @@ public class ResultPartitionFactory { } private static void initializeBoundedBlockingPartitions( - ResultSubpartition[] subpartitions, - ResultPartition parent, - BoundedBlockingSubpartitionType blockingSubpartitionType, - int networkBufferSize, - FileChannelManager channelManager) { - + ResultSubpartition[] subpartitions, + ResultPartition parent, + BoundedBlockingSubpartitionType blockingSubpartitionType, + int networkBufferSize, + FileChannelManager channelManager) { int i = 0; try { - for (; i < subpartitions.length; i++) { + for (i = 0; i < subpartitions.length; i++) { final File spillFile = channelManager.createChannel().getPathFile(); subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize); } @@ -194,8 +186,8 @@ public class ResultPartitionFactory { @VisibleForTesting FunctionWithException<BufferPoolOwner, BufferPool, IOException> createBufferPoolFactory( - int numberOfSubpartitions, ResultPartitionType type) { - + int numberOfSubpartitions, + ResultPartitionType type) { return p -> { int maxNumberOfMemorySegments = type.isBounded() ? numberOfSubpartitions * networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE; @@ -213,6 +205,7 @@ public class ResultPartitionFactory { return BoundedBlockingSubpartitionType.FILE_MMAP; case _32_BIT: return BoundedBlockingSubpartitionType.FILE; + case UNKNOWN: default: LOG.warn("Cannot determine memory architecture. Using pure file-based shuffle."); return BoundedBlockingSubpartitionType.FILE;
