[FLINK-7823][QS] Update Queryable State configuration parameters.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84746a86 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84746a86 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84746a86 Branch: refs/heads/master Commit: 84746a861ae02dd1dbcf1938d7ae2bf0d604e35f Parents: e8931bd Author: kkloudas <[email protected]> Authored: Mon Nov 6 12:43:18 2017 +0100 Committer: kkloudas <[email protected]> Committed: Tue Nov 7 14:07:54 2017 +0100 ---------------------------------------------------------------------- .../configuration/QueryableStateOptions.java | 25 +++---- .../network/AbstractServerBase.java | 8 +-- .../HAAbstractQueryableStateTestBase.java | 2 +- .../NonHAAbstractQueryableStateTestBase.java | 2 +- .../network/AbstractServerTest.java | 2 +- .../runtime/io/network/NetworkEnvironment.java | 2 +- .../QueryableStateConfiguration.java | 71 +++++++++++++------- .../taskexecutor/TaskManagerServices.java | 56 +++++++-------- .../TaskManagerServicesConfiguration.java | 38 ++++++----- .../minicluster/LocalFlinkMiniCluster.scala | 7 -- 10 files changed, 114 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java index adba938..ac88bed 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java @@ -32,11 +32,6 @@ public class QueryableStateOptions { // Server Options // ------------------------------------------------------------------------ - /** Flag to indicate whether to start the queryable state server. */ - public static final ConfigOption<Boolean> SERVER_ENABLE = - key("query.server.enable") - .defaultValue(true); - /** * The config parameter defining the server port range of the queryable state proxy. * @@ -59,6 +54,16 @@ public class QueryableStateOptions { key("query.proxy.ports") .defaultValue("9069"); + /** Number of network (event loop) threads for the client proxy (0 => #slots). */ + public static final ConfigOption<Integer> PROXY_NETWORK_THREADS = + key("query.proxy.network-threads") + .defaultValue(0); + + /** Number of async query threads for the client proxy (0 => #slots). */ + public static final ConfigOption<Integer> PROXY_ASYNC_QUERY_THREADS = + key("query.proxy.query-threads") + .defaultValue(0); + /** * The config parameter defining the server port range of the queryable state server. * @@ -100,16 +105,6 @@ public class QueryableStateOptions { key("query.client.network-threads") .defaultValue(0); - /** Number of retries on location lookup failures. */ - public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRIES = - key("query.client.lookup.num-retries") - .defaultValue(3); - - /** Retry delay on location lookup failures (millis). */ - public static final ConfigOption<Integer> CLIENT_LOOKUP_RETRY_DELAY = - key("query.client.lookup.retry-delay") - .defaultValue(1000); - // ------------------------------------------------------------------------ /** Not intended to be instantiated. */ http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java index 9c88774c..07ca26d 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java @@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends M */ public void start() throws Throwable { Preconditions.checkState(serverAddress == null, - "Server " + serverName + " already running @ " + serverAddress + '.'); + "The " + serverName + " already running @ " + serverAddress + '.'); Iterator<Integer> portIterator = bindPortRange.iterator(); while (portIterator.hasNext() && !attemptToBind(portIterator.next())) {} if (serverAddress != null) { - LOG.info("Started server {} @ {}.", serverName, serverAddress); + LOG.info("Started the {} @ {}.", serverName, serverAddress); } else { - LOG.info("Unable to start server {}. All ports in provided range are occupied.", serverName); - throw new FlinkRuntimeException("Unable to start server " + serverName + ". All ports in provided range are occupied."); + LOG.info("Unable to start the {}. All ports in provided range are occupied.", serverName); + throw new FlinkRuntimeException("Unable to start the " + serverName + ". All ports in provided range are occupied."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java index ab75cf4..fc4b2bc 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java @@ -55,8 +55,8 @@ public abstract class HAAbstractQueryableStateTestBase extends AbstractQueryable config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java index 3f1a1fb..6945cca 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateTestBase.java @@ -46,7 +46,7 @@ public abstract class NonHAAbstractQueryableStateTestBase extends AbstractQuerya config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM); config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1); - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true); + config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 1); config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1); config.setString(QueryableStateOptions.PROXY_PORT_RANGE, proxyPortRangeStart + "-" + (proxyPortRangeStart + NUM_TMS)); config.setString(QueryableStateOptions.SERVER_PORT_RANGE, serverPortRangeStart + "-" + (serverPortRangeStart + NUM_TMS)); http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java index 0b2727c..2775cd4 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java @@ -58,7 +58,7 @@ public class AbstractServerTest { // the expected exception along with the adequate message expectedEx.expect(FlinkRuntimeException.class); - expectedEx.expectMessage("Unable to start server Test Server 2. All ports in provided range are occupied."); + expectedEx.expectMessage("Unable to start the Test Server 2. All ports in provided range are occupied."); TestServer server1 = null; TestServer server2 = null; http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index cb43fbf..4fffacd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -310,7 +310,7 @@ public class NetworkEnvironment { if (kvStateServer != null) { try { kvStateServer.start(); - LOG.info("Started Queryable State Data Server @ {}", kvStateServer.getServerAddress()); + LOG.info("Started the Queryable State Data Server @ {}", kvStateServer.getServerAddress()); } catch (Throwable ie) { kvStateServer.shutdown(); kvStateServer = null; http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java index 5e6b7c5..7823a1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/QueryableStateConfiguration.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.util.NetUtils; + import java.util.Iterator; import static org.apache.flink.util.Preconditions.checkArgument; @@ -27,45 +30,44 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class QueryableStateConfiguration { - private final boolean enabled; - private final Iterator<Integer> proxyPortRange; private final Iterator<Integer> qserverPortRange; + private final int numProxyThreads; + + private final int numPQueryThreads; + private final int numServerThreads; - private final int numQueryThreads; + private final int numSQueryThreads; public QueryableStateConfiguration( - boolean enabled, Iterator<Integer> proxyPortRange, Iterator<Integer> qserverPortRange, + int numProxyThreads, + int numPQueryThreads, int numServerThreads, - int numQueryThreads) { + int numSQueryThreads) { - checkArgument(!enabled || (proxyPortRange != null && proxyPortRange.hasNext())); - checkArgument(!enabled || (qserverPortRange != null && qserverPortRange.hasNext())); + checkArgument(proxyPortRange != null && proxyPortRange.hasNext()); + checkArgument(qserverPortRange != null && qserverPortRange.hasNext()); + checkArgument(numProxyThreads >= 0, "queryable state number of server threads must be zero or larger"); + checkArgument(numPQueryThreads >= 0, "queryable state number of query threads must be zero or larger"); checkArgument(numServerThreads >= 0, "queryable state number of server threads must be zero or larger"); - checkArgument(numQueryThreads >= 0, "queryable state number of query threads must be zero or larger"); + checkArgument(numSQueryThreads >= 0, "queryable state number of query threads must be zero or larger"); - this.enabled = enabled; this.proxyPortRange = proxyPortRange; this.qserverPortRange = qserverPortRange; + this.numProxyThreads = numProxyThreads; + this.numPQueryThreads = numPQueryThreads; this.numServerThreads = numServerThreads; - this.numQueryThreads = numQueryThreads; + this.numSQueryThreads = numSQueryThreads; } // ------------------------------------------------------------------------ /** - * Returns whether queryable state is enabled. - */ - public boolean isEnabled() { - return enabled; - } - - /** * Returns the port range where the queryable state client proxy can listen. * See {@link org.apache.flink.configuration.QueryableStateOptions#PROXY_PORT_RANGE QueryableStateOptions.PROXY_PORT_RANGE}. */ @@ -85,7 +87,23 @@ public class QueryableStateConfiguration { * Returns the number of threads for the query server NIO event loop. * These threads only process network events and dispatch query requests to the query threads. */ - public int numServerThreads() { + public int numProxyServerThreads() { + return numProxyThreads; + } + + /** + * Returns the number of threads for the thread pool that performs the actual state lookup. + * These threads perform the actual state lookup. + */ + public int numProxyQueryThreads() { + return numPQueryThreads; + } + + /** + * Returns the number of threads for the query server NIO event loop. + * These threads only process network events and dispatch query requests to the query threads. + */ + public int numStateServerThreads() { return numServerThreads; } @@ -93,18 +111,19 @@ public class QueryableStateConfiguration { * Returns the number of threads for the thread pool that performs the actual state lookup. * These threads perform the actual state lookup. */ - public int numQueryThreads() { - return numQueryThreads; + public int numStateQueryThreads() { + return numSQueryThreads; } // ------------------------------------------------------------------------ @Override public String toString() { - return "QueryableStateConfiguration {" + - "enabled=" + enabled + - ", numServerThreads=" + numServerThreads + - ", numQueryThreads=" + numQueryThreads + + return "QueryableStateConfiguration{" + + "numProxyServerThreads=" + numProxyThreads + + ", numProxyQueryThreads=" + numPQueryThreads + + ", numStateServerThreads=" + numServerThreads + + ", numStateQueryThreads=" + numSQueryThreads + '}'; } @@ -114,6 +133,8 @@ public class QueryableStateConfiguration { * Gets the configuration describing the queryable state as deactivated. */ public static QueryableStateConfiguration disabled() { - return new QueryableStateConfiguration(false, null, null, 0, 0); + final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.PROXY_PORT_RANGE.defaultValue()); + final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString(QueryableStateOptions.SERVER_PORT_RANGE.defaultValue()); + return new QueryableStateConfiguration(proxyPorts, serverPorts, 0, 0, 0, 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index aed03f6..4daff05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -327,33 +327,35 @@ public class TaskManagerServices { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); KvStateRegistry kvStateRegistry = new KvStateRegistry(); - KvStateClientProxy kvClientProxy = null; - KvStateServer kvStateServer = null; - - if (taskManagerServicesConfiguration.getQueryableStateConfig().isEnabled()) { - QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); - - int numNetworkThreads = qsConfig.numServerThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numServerThreads(); - - int numQueryThreads = qsConfig.numQueryThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numQueryThreads(); - - kvClientProxy = QueryableStateUtils.createKvStateClientProxy( - taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.getProxyPortRange(), - numNetworkThreads, - numQueryThreads, - new DisabledKvStateRequestStats()); - - kvStateServer = QueryableStateUtils.createKvStateServer( - taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.getStateServerPortRange(), - numNetworkThreads, - numQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()); - } + + QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); + + int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads(); + + int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads(); + + final KvStateClientProxy kvClientProxy = QueryableStateUtils.createKvStateClientProxy( + taskManagerServicesConfiguration.getTaskManagerAddress(), + qsConfig.getProxyPortRange(), + numProxyServerNetworkThreads, + numProxyServerQueryThreads, + new DisabledKvStateRequestStats()); + + int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads(); + + int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads(); + + final KvStateServer kvStateServer = QueryableStateUtils.createKvStateServer( + taskManagerServicesConfiguration.getTaskManagerAddress(), + qsConfig.getStateServerPortRange(), + numStateServerNetworkThreads, + numStateServerQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); // we start the network first, to make sure it can allocate its buffers first return new NetworkEnvironment( http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 990fb22..bae683b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -400,23 +400,27 @@ public class TaskManagerServicesConfiguration { * Creates the {@link QueryableStateConfiguration} from the given Configuration. */ private static QueryableStateConfiguration parseQueryableStateConfiguration(Configuration config) { - final boolean enabled = config.getBoolean(QueryableStateOptions.SERVER_ENABLE); - - if (enabled) { - final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString( - config.getString(QueryableStateOptions.PROXY_PORT_RANGE, - QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); - final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString( - config.getString(QueryableStateOptions.SERVER_PORT_RANGE, - QueryableStateOptions.SERVER_PORT_RANGE.defaultValue())); - - final int numNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); - final int numQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); - return new QueryableStateConfiguration(true, proxyPorts, serverPorts, numNetworkThreads, numQueryThreads); - } - else { - return QueryableStateConfiguration.disabled(); - } + + final Iterator<Integer> proxyPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.PROXY_PORT_RANGE, + QueryableStateOptions.PROXY_PORT_RANGE.defaultValue())); + final Iterator<Integer> serverPorts = NetUtils.getPortRangeFromString( + config.getString(QueryableStateOptions.SERVER_PORT_RANGE, + QueryableStateOptions.SERVER_PORT_RANGE.defaultValue())); + + final int numProxyServerNetworkThreads = config.getInteger(QueryableStateOptions.PROXY_NETWORK_THREADS); + final int numProxyServerQueryThreads = config.getInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS); + + final int numStateServerNetworkThreads = config.getInteger(QueryableStateOptions.SERVER_NETWORK_THREADS); + final int numStateServerQueryThreads = config.getInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS); + + return new QueryableStateConfiguration( + proxyPorts, + serverPorts, + numProxyServerNetworkThreads, + numProxyServerQueryThreads, + numStateServerNetworkThreads, + numStateServerQueryThreads); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/84746a86/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 89197e2..8ef2e36 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -94,13 +94,6 @@ class LocalFlinkMiniCluster( config.addAll(userConfiguration) setMemory(config) initializeIOFormatClasses(config) - - // Disable queryable state server if nothing else is configured explicitly - if (!config.containsKey(QueryableStateOptions.SERVER_ENABLE.key())) { - LOG.info("Disabled queryable state server") - config.setBoolean(QueryableStateOptions.SERVER_ENABLE, false) - } - config }
