[FLINK-7823][QS] Update Queryable State configuration parameters.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84746a86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84746a86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84746a86

Branch: refs/heads/master
Commit: 84746a861ae02dd1dbcf1938d7ae2bf0d604e35f
Parents: e8931bd
Author: kkloudas <[email protected]>
Authored: Mon Nov 6 12:43:18 2017 +0100
Committer: kkloudas <[email protected]>
Committed: Tue Nov 7 14:07:54 2017 +0100

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    | 25 +++----
 .../network/AbstractServerBase.java             |  8 +--
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../NonHAAbstractQueryableStateTestBase.java    |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 +-
 .../QueryableStateConfiguration.java            | 71 +++++++++++++-------
 .../taskexecutor/TaskManagerServices.java       | 56 +++++++--------
 .../TaskManagerServicesConfiguration.java       | 38 ++++++-----
 .../minicluster/LocalFlinkMiniCluster.scala     |  7 --
 10 files changed, 114 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index adba938..ac88bed 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -32,11 +32,6 @@ public class QueryableStateOptions {
        // Server Options
        // 
------------------------------------------------------------------------
 
-       /** Flag to indicate whether to start the queryable state server. */
-       public static final ConfigOption<Boolean> SERVER_ENABLE =
-                       key("query.server.enable")
-                       .defaultValue(true);
-
        /**
         * The config parameter defining the server port range of the queryable 
state proxy.
         *
@@ -59,6 +54,16 @@ public class QueryableStateOptions {
                        key("query.proxy.ports")
                        .defaultValue("9069");
 
+       /** Number of network (event loop) threads for the client proxy (0 => 
#slots). */
+       public static final ConfigOption<Integer> PROXY_NETWORK_THREADS =
+                       key("query.proxy.network-threads")
+                                       .defaultValue(0);
+
+       /** Number of async query threads for the client proxy (0 => #slots). */
+       public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS =
+                       key("query.proxy.query-threads")
+                                       .defaultValue(0);
+
        /**
         * The config parameter defining the server port range of the queryable 
state server.
         *
@@ -100,16 +105,6 @@ public class QueryableStateOptions {
                        key("query.client.network-threads")
                        .defaultValue(0);
 
-       /** Number of retries on location lookup failures. */
-       public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRIES =
-                       key("query.client.lookup.num-retries")
-                       .defaultValue(3);
-
-       /** Retry delay on location lookup failures (millis). */
-       public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRY_DELAY =
-                       key("query.client.lookup.retry-delay")
-                       .defaultValue(1000);
-
        // 
------------------------------------------------------------------------
 
        /** Not intended to be instantiated. */

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 9c88774c..07ca26d 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
         */
        public void start() throws Throwable {
                Preconditions.checkState(serverAddress == null,
-                               "Server " + serverName + " already running @ " 
+ serverAddress + '.');
+                               "The " + serverName + " already running @ " + 
serverAddress + '.');
 
                Iterator<Integer> portIterator = bindPortRange.iterator();
                while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
 
                if (serverAddress != null) {
-                       LOG.info("Started server {} @ {}.", serverName, 
serverAddress);
+                       LOG.info("Started the {} @ {}.", serverName, 
serverAddress);
                } else {
-                       LOG.info("Unable to start server {}. All ports in 
provided range are occupied.", serverName);
-                       throw new FlinkRuntimeException("Unable to start server 
" + serverName + ". All ports in provided range are occupied.");
+                       LOG.info("Unable to start the {}. All ports in provided 
range are occupied.", serverName);
+                       throw new FlinkRuntimeException("Unable to start the " 
+ serverName + ". All ports in provided range are occupied.");
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index ab75cf4..fc4b2bc 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -55,8 +55,8 @@ public abstract class HAAbstractQueryableStateTestBase 
extends AbstractQueryable
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+                       
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
                        
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
                        
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
index 3f1a1fb..6945cca 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java
@@ -46,7 +46,7 @@ public abstract class NonHAAbstractQueryableStateTestBase 
extends AbstractQuerya
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
                        
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
+                       
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1);
                        
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
                        
config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + 
"-" + (proxyPortRangeStart + NUM_TMS));
                        
config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart 
+ "-" + (serverPortRangeStart + NUM_TMS));

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 0b2727c..2775cd4 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
                // the expected exception along with the adequate message
                expectedEx.expect(FlinkRuntimeException.class);
-               expectedEx.expectMessage("Unable to start server Test Server 2. 
All ports in provided range are occupied.");
+               expectedEx.expectMessage("Unable to start the Test Server 2. 
All ports in provided range are occupied.");
 
                TestServer server1 = null;
                TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index cb43fbf..4fffacd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,7 @@ public class NetworkEnvironment {
                        if (kvStateServer != null) {
                                try {
                                        kvStateServer.start();
-                                       LOG.info("Started Queryable State Data 
Server @ {}", kvStateServer.getServerAddress());
+                                       LOG.info("Started the Queryable State 
Data Server @ {}", kvStateServer.getServerAddress());
                                } catch (Throwable ie) {
                                        kvStateServer.shutdown();
                                        kvStateServer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
index 5e6b7c5..7823a1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.util.NetUtils;
+
 import java.util.Iterator;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -27,45 +30,44 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class QueryableStateConfiguration {
 
-       private final boolean enabled;
-
        private final Iterator<Integer> proxyPortRange;
 
        private final Iterator<Integer> qserverPortRange;
 
+       private final int numProxyThreads;
+
+       private final int numPQueryThreads;
+
        private final int numServerThreads;
 
-       private final int numQueryThreads;
+       private final int numSQueryThreads;
 
        public QueryableStateConfiguration(
-                       boolean enabled,
                        Iterator<Integer> proxyPortRange,
                        Iterator<Integer> qserverPortRange,
+                       int numProxyThreads,
+                       int numPQueryThreads,
                        int numServerThreads,
-                       int numQueryThreads) {
+                       int numSQueryThreads) {
 
-               checkArgument(!enabled || (proxyPortRange != null && 
proxyPortRange.hasNext()));
-               checkArgument(!enabled || (qserverPortRange != null && 
qserverPortRange.hasNext()));
+               checkArgument(proxyPortRange != null && 
proxyPortRange.hasNext());
+               checkArgument(qserverPortRange != null && 
qserverPortRange.hasNext());
+               checkArgument(numProxyThreads >= 0, "queryable state number of 
server threads must be zero or larger");
+               checkArgument(numPQueryThreads >= 0, "queryable state number of 
query threads must be zero or larger");
                checkArgument(numServerThreads >= 0, "queryable state number of 
server threads must be zero or larger");
-               checkArgument(numQueryThreads >= 0, "queryable state number of 
query threads must be zero or larger");
+               checkArgument(numSQueryThreads >= 0, "queryable state number of 
query threads must be zero or larger");
 
-               this.enabled = enabled;
                this.proxyPortRange = proxyPortRange;
                this.qserverPortRange = qserverPortRange;
+               this.numProxyThreads = numProxyThreads;
+               this.numPQueryThreads = numPQueryThreads;
                this.numServerThreads = numServerThreads;
-               this.numQueryThreads = numQueryThreads;
+               this.numSQueryThreads = numSQueryThreads;
        }
 
        // 
------------------------------------------------------------------------
 
        /**
-        * Returns whether queryable state is enabled.
-        */
-       public boolean isEnabled() {
-               return enabled;
-       }
-
-       /**
         * Returns the port range where the queryable state client proxy can 
listen.
         * See {@link 
org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE 
QueryableStateOptions.PROXY_PORT_RANGE}.
         */
@@ -85,7 +87,23 @@ public class QueryableStateConfiguration {
         * Returns the number of threads for the query server NIO event loop.
         * These threads only process network events and dispatch query 
requests to the query threads.
         */
-       public int numServerThreads() {
+       public int numProxyServerThreads() {
+               return numProxyThreads;
+       }
+
+       /**
+        * Returns the number of threads for the thread pool that performs the 
actual state lookup.
+        * These threads perform the actual state lookup.
+        */
+       public int numProxyQueryThreads() {
+               return numPQueryThreads;
+       }
+
+       /**
+        * Returns the number of threads for the query server NIO event loop.
+        * These threads only process network events and dispatch query 
requests to the query threads.
+        */
+       public int numStateServerThreads() {
                return numServerThreads;
        }
 
@@ -93,18 +111,19 @@ public class QueryableStateConfiguration {
         * Returns the number of threads for the thread pool that performs the 
actual state lookup.
         * These threads perform the actual state lookup.
         */
-       public int numQueryThreads() {
-               return numQueryThreads;
+       public int numStateQueryThreads() {
+               return numSQueryThreads;
        }
 
        // 
------------------------------------------------------------------------
 
        @Override
        public String toString() {
-               return "QueryableStateConfiguration {" +
-                               "enabled=" + enabled +
-                               ", numServerThreads=" + numServerThreads +
-                               ", numQueryThreads=" + numQueryThreads +
+               return "QueryableStateConfiguration{" +
+                               "numProxyServerThreads=" + numProxyThreads +
+                               ", numProxyQueryThreads=" + numPQueryThreads +
+                               ", numStateServerThreads=" + numServerThreads +
+                               ", numStateQueryThreads=" + numSQueryThreads +
                                '}';
        }
 
@@ -114,6 +133,8 @@ public class QueryableStateConfiguration {
         * Gets the configuration describing the queryable state as deactivated.
         */
        public static QueryableStateConfiguration disabled() {
-               return new QueryableStateConfiguration(false, null, null, 0, 0);
+               final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue());
+               final Iterator<Integer> serverPorts = 
NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue());
+               return new QueryableStateConfiguration(proxyPorts, serverPorts, 
0, 0, 0, 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index aed03f6..4daff05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -327,33 +327,35 @@ public class TaskManagerServices {
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
                KvStateRegistry kvStateRegistry = new KvStateRegistry();
-               KvStateClientProxy kvClientProxy = null;
-               KvStateServer kvStateServer = null;
-
-               if 
(taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) {
-                       QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
-
-                       int numNetworkThreads = qsConfig.numServerThreads() == 
0 ?
-                                       
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numServerThreads();
-
-                       int numQueryThreads = qsConfig.numQueryThreads() == 0 ?
-                                       
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numQueryThreads();
-
-                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
-                                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                                       qsConfig.getProxyPortRange(),
-                                       numNetworkThreads,
-                                       numQueryThreads,
-                                       new DisabledKvStateRequestStats());
-
-                       kvStateServer = QueryableStateUtils.createKvStateServer(
-                                       
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                                       qsConfig.getStateServerPortRange(),
-                                       numNetworkThreads,
-                                       numQueryThreads,
-                                       kvStateRegistry,
-                                       new DisabledKvStateRequestStats());
-               }
+
+               QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
+
+               int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyServerThreads();
+
+               int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyQueryThreads();
+
+               final KvStateClientProxy kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
+                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                               qsConfig.getProxyPortRange(),
+                               numProxyServerNetworkThreads,
+                               numProxyServerQueryThreads,
+                               new DisabledKvStateRequestStats());
+
+               int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateServerThreads();
+
+               int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateQueryThreads();
+
+               final KvStateServer kvStateServer = 
QueryableStateUtils.createKvStateServer(
+                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                               qsConfig.getStateServerPortRange(),
+                               numStateServerNetworkThreads,
+                               numStateServerQueryThreads,
+                               kvStateRegistry,
+                               new DisabledKvStateRequestStats());
 
                // we start the network first, to make sure it can allocate its 
buffers first
                return new NetworkEnvironment(

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 990fb22..bae683b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -400,23 +400,27 @@ public class TaskManagerServicesConfiguration {
         * Creates the {@link QueryableStateConfiguration} from the given 
Configuration.
         */
        private static QueryableStateConfiguration 
parseQueryableStateConfiguration(Configuration config) {
-               final boolean enabled = 
config.getBoolean(QueryableStateOptions.SERVER_ENABLE);
-
-               if (enabled) {
-                       final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(
-                                       
config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
-                                                       
QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
-                       final Iterator<Integer> serverPorts = 
NetUtils.getPortRangeFromString(
-                                       
config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
-                                                       
QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
-
-                       final int numNetworkThreads = 
config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
-                       final int numQueryThreads = 
config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
-                       return new QueryableStateConfiguration(true, 
proxyPorts, serverPorts, numNetworkThreads, numQueryThreads);
-               }
-               else {
-                       return QueryableStateConfiguration.disabled();
-               }
+
+               final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(
+                               
config.getString(QueryableStateOptions.PROXY_PORT_RANGE,
+                                               
QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()));
+               final Iterator<Integer> serverPorts = 
NetUtils.getPortRangeFromString(
+                               
config.getString(QueryableStateOptions.SERVER_PORT_RANGE,
+                                               
QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()));
+
+               final int numProxyServerNetworkThreads = 
config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS);
+               final int numProxyServerQueryThreads = 
config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS);
+
+               final int numStateServerNetworkThreads = 
config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS);
+               final int numStateServerQueryThreads = 
config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS);
+
+               return new QueryableStateConfiguration(
+                               proxyPorts,
+                               serverPorts,
+                               numProxyServerNetworkThreads,
+                               numProxyServerQueryThreads,
+                               numStateServerNetworkThreads,
+                               numStateServerQueryThreads);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 89197e2..8ef2e36 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -94,13 +94,6 @@ class LocalFlinkMiniCluster(
     config.addAll(userConfiguration)
     setMemory(config)
     initializeIOFormatClasses(config)
-
-    // Disable queryable state server if nothing else is configured explicitly
-    if (!config.containsKey(QueryableStateOptions.SERVER_ENABLE.key())) {
-      LOG.info("Disabled queryable state server")
-      config.setBoolean(QueryableStateOptions.SERVER_ENABLE, false)
-    }
-
     config
   }
 

Reply via email to