Repository: flink
Updated Branches:
  refs/heads/master 3039df809 -> a9743eb68


[FLINK-7920] Make MiniClusterConfiguration immutable

This closes #4905.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9743eb6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9743eb6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9743eb6

Branch: refs/heads/master
Commit: a9743eb6850809784930c1199741aa83ae54e99a
Parents: 3039df8
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 25 17:31:45 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Oct 26 10:22:22 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  |  84 ++-----
 .../minicluster/MiniClusterConfiguration.java   | 227 +++++++++----------
 .../runtime/minicluster/MiniClusterITCase.java  |  20 +-
 .../Flip6LocalStreamEnvironment.java            |   8 +-
 4 files changed, 134 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index f293a01..dd352bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import akka.actor.ActorSystem;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -47,6 +44,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.util.ExceptionUtils;
 
+import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +65,7 @@ public class MiniCluster {
        private final Object lock = new Object();
 
        /** The configuration for this mini cluster */
-       private final MiniClusterConfiguration config;
+       private final MiniClusterConfiguration miniClusterConfiguration;
 
        @GuardedBy("lock") 
        private MetricRegistry metricRegistry;
@@ -107,48 +105,12 @@ public class MiniCluster {
        // 
------------------------------------------------------------------------
 
        /**
-        * Creates a new mini cluster with the default configuration:
-        * <ul>
-        *     <li>One JobManager</li>
-        *     <li>One TaskManager</li>
-        *     <li>One task slot in the TaskManager</li>
-        *     <li>All components share the same RPC subsystem (minimizes 
communication overhead)</li>
-        * </ul>
-        */
-       public MiniCluster() {
-               this(new MiniClusterConfiguration());
-       }
-
-       /**
         * Creates a new Flink mini cluster based on the given configuration.
         * 
-        * @param config The configuration for the mini cluster
+        * @param miniClusterConfiguration The configuration for the mini 
cluster
         */
-       public MiniCluster(MiniClusterConfiguration config) {
-               this.config = checkNotNull(config, "config may not be null");
-       }
-
-       /**
-        * Creates a mini cluster based on the given configuration.
-        * 
-        * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} 
instead. 
-        * @see #MiniCluster(MiniClusterConfiguration)
-        */
-       @Deprecated
-       public MiniCluster(Configuration config) {
-               this(createConfig(config, true));
-       }
-
-       /**
-        * Creates a mini cluster based on the given configuration, starting 
one or more
-        * RPC services, depending on the given flag.
-        *
-        * @deprecated Use {@link #MiniCluster(MiniClusterConfiguration)} 
instead. 
-        * @see #MiniCluster(MiniClusterConfiguration)
-        */
-       @Deprecated
-       public MiniCluster(Configuration config, boolean singleRpcService) {
-               this(createConfig(config, singleRpcService));
+       public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
+               this.miniClusterConfiguration = 
checkNotNull(miniClusterConfiguration, "config may not be null");
 
                running = false;
        }
@@ -175,14 +137,14 @@ public class MiniCluster {
                        checkState(!running, "FlinkMiniCluster is already 
running");
 
                        LOG.info("Starting Flink Mini Cluster");
-                       LOG.debug("Using configuration {}", config);
+                       LOG.debug("Using configuration {}", 
miniClusterConfiguration);
 
-                       final Configuration configuration = new 
UnmodifiableConfiguration(config.generateConfiguration());
-                       final Time rpcTimeout = config.getRpcTimeout();
-                       final int numJobManagers = config.getNumJobManagers();
-                       final int numTaskManagers = config.getNumTaskManagers();
-                       final int numResourceManagers = 
config.getNumResourceManagers();
-                       final boolean singleRpc = 
config.getUseSingleRpcSystem();
+                       final Configuration configuration = 
miniClusterConfiguration.getConfiguration();
+                       final Time rpcTimeout = 
miniClusterConfiguration.getRpcTimeout();
+                       final int numJobManagers = 
miniClusterConfiguration.getNumJobManagers();
+                       final int numTaskManagers = 
miniClusterConfiguration.getNumTaskManagers();
+                       final int numResourceManagers = 
miniClusterConfiguration.getNumResourceManagers();
+                       final boolean useSingleRpcService = 
miniClusterConfiguration.getRpcServiceSharing() == 
MiniClusterConfiguration.RpcServiceSharing.SHARED;
 
                        try {
                                LOG.info("Starting Metrics Registry");
@@ -198,7 +160,7 @@ public class MiniCluster {
                                // we always need the 'commonRpcService' for 
auxiliary calls
                                commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
-                               if (singleRpc) {
+                               if (useSingleRpcService) {
                                        // set that same RPC service for all 
JobManagers and TaskManagers
                                        for (int i = 0; i < numJobManagers; 
i++) {
                                                jobManagerRpcServices[i] = 
commonRpcService;
@@ -216,9 +178,9 @@ public class MiniCluster {
                                }
                                else {
                                        // start a new service per component, 
possibly with custom bind addresses
-                                       final String jobManagerBindAddress = 
config.getJobManagerBindAddress();
-                                       final String taskManagerBindAddress = 
config.getTaskManagerBindAddress();
-                                       final String resourceManagerBindAddress 
= config.getResourceManagerBindAddress();
+                                       final String jobManagerBindAddress = 
miniClusterConfiguration.getJobManagerBindAddress();
+                                       final String taskManagerBindAddress = 
miniClusterConfiguration.getTaskManagerBindAddress();
+                                       final String resourceManagerBindAddress 
= miniClusterConfiguration.getResourceManagerBindAddress();
 
                                        for (int i = 0; i < numJobManagers; 
i++) {
                                                jobManagerRpcServices[i] = 
createRpcService(
@@ -625,20 +587,6 @@ public class MiniCluster {
                return priorException;
        }
 
-       private static MiniClusterConfiguration createConfig(Configuration cfg, 
boolean singleRpcService) {
-               MiniClusterConfiguration config = cfg == null ?
-                               new MiniClusterConfiguration() :
-                               new MiniClusterConfiguration(cfg);
-
-               if (singleRpcService) {
-                       config.setUseSingleRpcService();
-               } else {
-                       config.setUseRpcServicePerComponent();
-               }
-
-               return config;
-       }
-
        private class TerminatingFatalErrorHandler implements FatalErrorHandler 
{
 
                private final int index;

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index aa9b0c2..52e037c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,97 +22,60 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.flink.runtime.minicluster.MiniClusterConfiguration.RpcServiceSharing.SHARED;
 
+/**
+ * Configuration object for the {@link MiniCluster}.
+ */
 public class MiniClusterConfiguration {
 
-       private final Configuration config;
+       private final UnmodifiableConfiguration configuration;
 
-       private boolean singleRpcService = true;
+       private final int numJobManagers;
 
-       private int numJobManagers = 1;
+       private final int numTaskManagers;
 
-       private int numTaskManagers = 1;
+       private final int numResourceManagers;
 
-       private int numResourceManagers = 1;
+       private final RpcServiceSharing rpcServiceSharing;
 
-       private String commonBindAddress;
-
-       private long managedMemoryPerTaskManager = -1;
+       @Nullable
+       private final String commonBindAddress;
 
        // 
------------------------------------------------------------------------
        //  Construction
        // 
------------------------------------------------------------------------
 
-       public MiniClusterConfiguration() {
-               this.config = new Configuration();
-       }
-
-       public MiniClusterConfiguration(Configuration config) {
-               checkNotNull(config);
-               this.config = new Configuration(config);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  setters
-       // 
------------------------------------------------------------------------
-
-       public void addConfiguration(Configuration config) {
-               checkNotNull(config, "configuration must not be null");
-               this.config.addAll(config);
-       }
+       public MiniClusterConfiguration(
+                       Configuration configuration,
+                       int numJobManagers,
+                       int numTaskManagers,
+                       int numResourceManagers,
+                       RpcServiceSharing rpcServiceSharing,
+                       @Nullable String commonBindAddress) {
 
-       public void setUseSingleRpcService() {
-               this.singleRpcService = true;
-       }
-
-       public void setUseRpcServicePerComponent() {
-               this.singleRpcService = false;
-       }
-
-       public void setNumJobManagers(int numJobManagers) {
-               checkArgument(numJobManagers >= 1, "must have at least one 
JobManager");
+               this.configuration = new 
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
                this.numJobManagers = numJobManagers;
-       }
-
-       public void setNumTaskManagers(int numTaskManagers) {
-               checkArgument(numTaskManagers >= 1, "must have at least one 
TaskManager");
                this.numTaskManagers = numTaskManagers;
-       }
-
-       public void setNumResourceManagers(int numResourceManagers) {
-               checkArgument(numResourceManagers >= 1, "must have at least one 
ResourceManager");
                this.numResourceManagers = numResourceManagers;
-       }
-
-       public void setNumTaskManagerSlots(int numTaskSlots) {
-               checkArgument(numTaskSlots >= 1, "must have at least one task 
slot per TaskManager");
-               
this.config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numTaskSlots);
-       }
-
-       public void setCommonRpcBindAddress(String bindAddress) {
-               checkNotNull(bindAddress, "bind address must not be null");
-               this.commonBindAddress = bindAddress;
-       }
-
-       public void setManagedMemoryPerTaskManager(long 
managedMemoryPerTaskManager) {
-               checkArgument(managedMemoryPerTaskManager > 0, "must have more 
than 0 MB of memory for the TaskManager.");
-               this.managedMemoryPerTaskManager = managedMemoryPerTaskManager;
+               this.rpcServiceSharing = 
Preconditions.checkNotNull(rpcServiceSharing);
+               this.commonBindAddress = commonBindAddress;
        }
 
        // 
------------------------------------------------------------------------
        //  getters
        // 
------------------------------------------------------------------------
 
-       public boolean getUseSingleRpcSystem() {
-               return singleRpcService;
+       public RpcServiceSharing getRpcServiceSharing() {
+               return rpcServiceSharing;
        }
 
        public int getNumJobManagers() {
@@ -127,107 +90,121 @@ public class MiniClusterConfiguration {
                return numResourceManagers;
        }
 
-       public int getNumSlotsPerTaskManager() {
-               return 
config.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-       }
-
        public String getJobManagerBindAddress() {
                return commonBindAddress != null ?
                                commonBindAddress :
-                               config.getString(JobManagerOptions.ADDRESS, 
"localhost");
+                               
configuration.getString(JobManagerOptions.ADDRESS, "localhost");
        }
 
        public String getTaskManagerBindAddress() {
                return commonBindAddress != null ?
                                commonBindAddress :
-                               
config.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+                               
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
        }
 
        public String getResourceManagerBindAddress() {
                return commonBindAddress != null ?
                        commonBindAddress :
-                       config.getString(JobManagerOptions.ADDRESS, 
"localhost"); // TODO: Introduce proper configuration constant for the resource 
manager hostname
+                       configuration.getString(JobManagerOptions.ADDRESS, 
"localhost"); // TODO: Introduce proper configuration constant for the resource 
manager hostname
        }
 
        public Time getRpcTimeout() {
-               FiniteDuration duration = AkkaUtils.getTimeout(config);
+               FiniteDuration duration = AkkaUtils.getTimeout(configuration);
                return Time.of(duration.length(), duration.unit());
        }
 
-       public long getManagedMemoryPerTaskManager() {
-               return getOrCalculateManagedMemoryPerTaskManager();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  utils
-       // 
------------------------------------------------------------------------
-
-       public Configuration generateConfiguration() {
-               Configuration newConfiguration = new Configuration(config);
-               // set the memory
-               long memory = getOrCalculateManagedMemoryPerTaskManager();
-               
newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
-
-               return newConfiguration;
+       public UnmodifiableConfiguration getConfiguration() {
+               return configuration;
        }
 
        @Override
        public String toString() {
                return "MiniClusterConfiguration {" +
-                               "singleRpcService=" + singleRpcService +
+                               "singleRpcService=" + rpcServiceSharing +
                                ", numJobManagers=" + numJobManagers +
                                ", numTaskManagers=" + numTaskManagers +
                                ", numResourceManagers=" + numResourceManagers +
                                ", commonBindAddress='" + commonBindAddress + 
'\'' +
-                               ", config=" + config +
+                               ", config=" + configuration +
                                '}';
        }
 
+       // 
----------------------------------------------------------------------------------
+       // Enums
+       // 
----------------------------------------------------------------------------------
+
        /**
-        * Get or calculate the managed memory per task manager. The memory is 
calculated in the
-        * following order:
-        *
-        * 1. Return {@link #managedMemoryPerTaskManager} if set
-        * 2. Return 
config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY) if set
-        * 3. Distribute the available free memory equally among all components 
(JMs, RMs and TMs) and
-        * calculate the managed memory from the share of memory for a single 
task manager.
-        *
-        * @return size of managed memory per task manager (in megabytes)
+        * Enum which defines whether the mini cluster components use a shared 
RpcService
+        * or whether every component gets its own dedicated RpcService started.
         */
-       private long getOrCalculateManagedMemoryPerTaskManager() {
-               if (managedMemoryPerTaskManager == -1) {
-                       // no memory set in the mini cluster configuration
+       public enum RpcServiceSharing {
+               SHARED, // a single shared rpc service
+               DEDICATED // every component gets his own dedicated rpc service
+       }
 
-                       long memorySize = 
config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+       // 
----------------------------------------------------------------------------------
+       // Builder
+       // 
----------------------------------------------------------------------------------
 
-                       // we could probably use config.contains() but the 
previous implementation compared to
-                       // the default (-1) thus allowing the user to 
explicitly specify this as well
-                       // -> don't change this behaviour now
-                       if (memorySize == 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
-                               // no memory set in the flink configuration
-                               // share the available memory among all running 
components
+       /**
+        * Builder for the MiniClusterConfiguration.
+        */
+       public static class Builder {
+               private Configuration configuration = new Configuration();
+               private int numJobManagers = 1;
+               private int numTaskManagers = 1;
+               private int numSlotsPerTaskManager = 1;
+               private int numResourceManagers = 1;
+               private RpcServiceSharing rpcServiceSharing = SHARED;
+               @Nullable
+               private String commonBindAddress = null;
+
+               public Builder setConfiguration(Configuration configuration1) {
+                       this.configuration = 
Preconditions.checkNotNull(configuration1);
+                       return this;
+               }
 
-                               float memoryFraction = 
config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+               public Builder setNumJobManagers(int numJobManagers) {
+                       this.numJobManagers = numJobManagers;
+                       return this;
+               }
 
-                               long freeMemory = 
EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
+               public Builder setNumTaskManagers(int numTaskManagers) {
+                       this.numTaskManagers = numTaskManagers;
+                       return this;
+               }
 
-                               // we assign each component the same amount of 
free memory
-                               // (might be a bit of overkill for the JMs and 
RMs)
-                               long memoryPerComponent = freeMemory / 
(numTaskManagers + numResourceManagers + numJobManagers);
+               public Builder setNumSlotsPerTaskManager(int 
numSlotsPerTaskManager) {
+                       this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+                       return this;
+               }
 
-                               // subtract the network buffer memory
-                               long networkBuffersMemory = 
TaskManagerServices.calculateNetworkBufferMemory(memoryPerComponent, config);
-                               long memoryMinusNetworkBuffers = 
memoryPerComponent - networkBuffersMemory;
+               public Builder setNumResourceManagers(int numResourceManagers) {
+                       this.numResourceManagers = numResourceManagers;
+                       return this;
+               }
 
-                               // calculate the managed memory size
-                               long managedMemoryBytes = (long) 
(memoryMinusNetworkBuffers * memoryFraction);
+               public Builder setRpcServiceSharing(RpcServiceSharing 
rpcServiceSharing) {
+                       this.rpcServiceSharing = 
Preconditions.checkNotNull(rpcServiceSharing);
+                       return this;
+               }
+
+               public Builder setCommonBindAddress(String commonBindAddress) {
+                       this.commonBindAddress = commonBindAddress;
+                       return this;
+               }
 
-                               return managedMemoryBytes >> 20; // bytes to 
megabytes
-                       } else {
-                               return memorySize;
-                       }
-               } else {
-                       return managedMemoryPerTaskManager;
+               public MiniClusterConfiguration build() {
+                       final Configuration modifiedConfiguration = new 
Configuration(configuration);
+                       
modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+
+                       return new MiniClusterConfiguration(
+                               modifiedConfiguration,
+                               numJobManagers,
+                               numTaskManagers,
+                               numResourceManagers,
+                               rpcServiceSharing,
+                               commonBindAddress);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f90367c..8ca1329 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -40,13 +40,13 @@ public class MiniClusterITCase extends TestLogger {
        //  Simple Job Running Tests
        // 
------------------------------------------------------------------------
 
+       private static final MiniClusterConfiguration defaultConfiguration = 
null;
+
        @Test
        public void runJobWithSingleRpcService() throws Exception {
-               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-
-               // should be the default, but set anyways to make sure the test
-               // stays valid when the default changes
-               cfg.setUseSingleRpcService();
+               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
+                       .build();
 
                MiniCluster miniCluster = new MiniCluster(cfg);
                try {
@@ -60,8 +60,9 @@ public class MiniClusterITCase extends TestLogger {
 
        @Test
        public void runJobWithMultipleRpcServices() throws Exception {
-               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-               cfg.setUseRpcServicePerComponent();
+               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.DEDICATED)
+                       .build();
 
                MiniCluster miniCluster = new MiniCluster(cfg);
                try {
@@ -75,8 +76,9 @@ public class MiniClusterITCase extends TestLogger {
 
        @Test
        public void runJobWithMultipleJobManagers() throws Exception {
-               MiniClusterConfiguration cfg = new MiniClusterConfiguration();
-               cfg.setNumJobManagers(3);
+               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+               .setNumJobManagers(3)
+               .build();
 
                MiniCluster miniCluster = new MiniCluster(cfg);
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9743eb6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index cebd15f..e276ac7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -99,15 +99,17 @@ public class Flip6LocalStreamEnvironment extends 
StreamExecutionEnvironment {
                // add (and override) the settings with what the user defined
                configuration.addAll(this.conf);
 
-               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration(configuration);
-
                // Currently we do not reuse slot anymore,
                // so we need to sum up the parallelism of all vertices
                int slotsCount = 0;
                for (JobVertex jobVertex : jobGraph.getVertices()) {
                        slotsCount += jobVertex.getParallelism();
                }
-               cfg.setNumTaskManagerSlots(slotsCount);
+
+               MiniClusterConfiguration cfg = new 
MiniClusterConfiguration.Builder()
+                       .setConfiguration(configuration)
+                       .setNumSlotsPerTaskManager(slotsCount)
+                       .build();
 
                if (LOG.isInfoEnabled()) {
                        LOG.info("Running job on local embedded Flink mini 
cluster");

Reply via email to