This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac0e20499d62467e279a3ce6bc305a3f879fbab4
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Mon Jul 29 17:27:50 2019 +0300

    [hotfix][network] fix codestyle issues in ResultPartitionFactory
---
 .../network/partition/ResultPartitionFactory.java  | 49 ++++++++++------------
 1 file changed, 21 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index b390987..0656e6e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
@@ -46,13 +44,10 @@ public class ResultPartitionFactory {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartitionFactory.class);
 
-       @Nonnull
        private final ResultPartitionManager partitionManager;
 
-       @Nonnull
        private final FileChannelManager channelManager;
 
-       @Nonnull
        private final BufferPoolFactory bufferPoolFactory;
 
        private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -66,9 +61,9 @@ public class ResultPartitionFactory {
        private final boolean forcePartitionReleaseOnConsumption;
 
        public ResultPartitionFactory(
-               @Nonnull ResultPartitionManager partitionManager,
-               @Nonnull FileChannelManager channelManager,
-               @Nonnull BufferPoolFactory bufferPoolFactory,
+               ResultPartitionManager partitionManager,
+               FileChannelManager channelManager,
+               BufferPoolFactory bufferPoolFactory,
                BoundedBlockingSubpartitionType blockingSubpartitionType,
                int networkBuffersPerChannel,
                int floatingNetworkBuffersPerGate,
@@ -86,9 +81,8 @@ public class ResultPartitionFactory {
        }
 
        public ResultPartition create(
-               @Nonnull String taskNameWithSubtaskAndId,
-               @Nonnull ResultPartitionDeploymentDescriptor desc) {
-
+                       String taskNameWithSubtaskAndId,
+                       ResultPartitionDeploymentDescriptor desc) {
                return create(
                        taskNameWithSubtaskAndId,
                        desc.getShuffleDescriptor().getResultPartitionID(),
@@ -100,13 +94,12 @@ public class ResultPartitionFactory {
 
        @VisibleForTesting
        public ResultPartition create(
-               @Nonnull String taskNameWithSubtaskAndId,
-               @Nonnull ResultPartitionID id,
-               @Nonnull ResultPartitionType type,
-               int numberOfSubpartitions,
-               int maxParallelism,
-               FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
bufferPoolFactory) {
-
+                       String taskNameWithSubtaskAndId,
+                       ResultPartitionID id,
+                       ResultPartitionType type,
+                       int numberOfSubpartitions,
+                       int maxParallelism,
+                       FunctionWithException<BufferPoolOwner, BufferPool, 
IOException> bufferPoolFactory) {
                ResultSubpartition[] subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
 
                ResultPartition partition = forcePartitionReleaseOnConsumption 
|| !type.isBlocking()
@@ -139,10 +132,10 @@ public class ResultPartitionFactory {
                        ResultPartitionType type,
                        BoundedBlockingSubpartitionType 
blockingSubpartitionType,
                        ResultSubpartition[] subpartitions) {
-
                // Create the subpartitions.
                switch (type) {
                        case BLOCKING:
+                       case BLOCKING_PERSISTENT:
                                
initializeBoundedBlockingPartitions(subpartitions, partition, 
blockingSubpartitionType, networkBufferSize, channelManager);
                                break;
 
@@ -160,15 +153,14 @@ public class ResultPartitionFactory {
        }
 
        private static void initializeBoundedBlockingPartitions(
-               ResultSubpartition[] subpartitions,
-               ResultPartition parent,
-               BoundedBlockingSubpartitionType blockingSubpartitionType,
-               int networkBufferSize,
-               FileChannelManager channelManager) {
-
+                       ResultSubpartition[] subpartitions,
+                       ResultPartition parent,
+                       BoundedBlockingSubpartitionType 
blockingSubpartitionType,
+                       int networkBufferSize,
+                       FileChannelManager channelManager) {
                int i = 0;
                try {
-                       for (; i < subpartitions.length; i++) {
+                       for (i = 0; i < subpartitions.length; i++) {
                                final File spillFile = 
channelManager.createChannel().getPathFile();
                                subpartitions[i] = 
blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
                        }
@@ -194,8 +186,8 @@ public class ResultPartitionFactory {
 
        @VisibleForTesting
        FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
createBufferPoolFactory(
-               int numberOfSubpartitions, ResultPartitionType type) {
-
+                       int numberOfSubpartitions,
+                       ResultPartitionType type) {
                return p -> {
                        int maxNumberOfMemorySegments = type.isBounded() ?
                                numberOfSubpartitions * 
networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
@@ -213,6 +205,7 @@ public class ResultPartitionFactory {
                                return 
BoundedBlockingSubpartitionType.FILE_MMAP;
                        case _32_BIT:
                                return BoundedBlockingSubpartitionType.FILE;
+                       case UNKNOWN:
                        default:
                                LOG.warn("Cannot determine memory architecture. 
Using pure file-based shuffle.");
                                return BoundedBlockingSubpartitionType.FILE;

Reply via email to