aljoscha closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly enable 
qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/queryable_state_configuration.html 
b/docs/_includes/generated/queryable_state_configuration.html
index c457c40e147..5940fdc8229 100644
--- a/docs/_includes/generated/queryable_state_configuration.html
+++ b/docs/_includes/generated/queryable_state_configuration.html
@@ -42,5 +42,10 @@
             <td style="word-wrap: break-word;">0</td>
             <td>Number of query Threads for queryable state server. Uses the 
number of slots if set to 0.</td>
         </tr>
+        <tr>
+            <td><h5>queryable-state.enable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Option whether the queryable state proxy and server should be 
enabled where possible and configurable.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 6a8041926b7..20c6b53a209 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -106,6 +106,16 @@
                        .defaultValue(0)
                        .withDescription("Number of query Threads for queryable 
state server. Uses the number of slots if set to 0.");
 
+       /** Option whether the queryable state proxy and server should be 
enabled where possible and configurable.
+        *
+        * <p>Queryable state proxy and server are still more experimental 
features, hence disabled unless they are enable
+        * in user configuration. */
+       public static final ConfigOption<Boolean> 
ENABLE_QUERYABLE_STATE_PROXY_SERVER =
+               key("queryable-state.enable")
+                       .defaultValue(false)
+                       .withDescription("Option whether the queryable state 
proxy and server should be enabled where possible" +
+                               " and configurable.");
+
        // 
------------------------------------------------------------------------
        // Client Options
        // 
------------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 9755b52e2f1..73520248475 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
        private static Configuration getConfig() throws Exception {
 
                Configuration config = new Configuration();
+               
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
                config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
                config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 94baaed3745..efade709489 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -95,6 +95,7 @@ public static void tearDown() throws Exception {
        private static Configuration getConfig() throws Exception {
 
                Configuration config = new Configuration();
+               
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
                config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
                config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 
NUM_JMS);
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index 36637c0d342..0cc4b0b3812 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -79,6 +79,7 @@ public static void tearDown() {
 
        private static Configuration getConfig() {
                Configuration config = new Configuration();
+               
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
                config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
                config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index ce3e665fce6..38b2877e551 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -78,6 +78,7 @@ public static void tearDown() {
 
        private static Configuration getConfig() {
                Configuration config = new Configuration();
+               
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
                config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TMS);
                config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d124456c213..89e23dac1b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -345,7 +345,7 @@ public void start() throws IOException {
                                } catch (Throwable ie) {
                                        kvStateServer.shutdown();
                                        kvStateServer = null;
-                                       throw new IOException("Failed to start 
the Queryable State Data Server.", ie);
+                                       LOG.error("Failed to start the 
Queryable State Data Server.", ie);
                                }
                        }
 
@@ -355,7 +355,7 @@ public void start() throws IOException {
                                } catch (Throwable ie) {
                                        kvStateProxy.shutdown();
                                        kvStateProxy = null;
-                                       throw new IOException("Failed to start 
the Queryable State Client Proxy.", ie);
+                                       LOG.error("Failed to start the 
Queryable State Client Proxy.", ie);
                                }
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c46d800bc95..f3feb2dab71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -419,32 +419,38 @@ private static NetworkEnvironment 
createNetworkEnvironment(
 
                QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
 
-               int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
+               KvStateClientProxy kvClientProxy = null;
+               KvStateServer kvStateServer = null;
+
+               if (qsConfig != null) {
+                       int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyServerThreads();
 
-               int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
+                       int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyQueryThreads();
 
-               final KvStateClientProxy kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
+
+                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
                                
taskManagerServicesConfiguration.getTaskManagerAddress(),
                                qsConfig.getProxyPortRange(),
                                numProxyServerNetworkThreads,
                                numProxyServerQueryThreads,
                                new DisabledKvStateRequestStats());
 
-               int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
+                       int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateServerThreads();
 
-               int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
+                       int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateQueryThreads();
 
-               final KvStateServer kvStateServer = 
QueryableStateUtils.createKvStateServer(
+                       kvStateServer = QueryableStateUtils.createKvStateServer(
                                
taskManagerServicesConfiguration.getTaskManagerAddress(),
                                qsConfig.getStateServerPortRange(),
                                numStateServerNetworkThreads,
                                numStateServerQueryThreads,
                                kvStateRegistry,
                                new DisabledKvStateRequestStats());
+               }
 
                // we start the network first, to make sure it can allocate its 
buffers first
                return new NetworkEnvironment(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c27e54d7649..5acbddebfed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -39,6 +39,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -66,6 +67,7 @@
 
        private final NetworkEnvironmentConfiguration networkConfig;
 
+       @Nullable
        private final QueryableStateConfiguration queryableStateConfig;
 
        /**
@@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration(
                        String[] localRecoveryStateRootDirectories,
                        boolean localRecoveryEnabled,
                        NetworkEnvironmentConfiguration networkConfig,
-                       QueryableStateConfiguration queryableStateConfig,
+                       @Nullable QueryableStateConfiguration 
queryableStateConfig,
                        int numberOfSlots,
                        long configuredMemory,
                        MemoryType memoryType,
@@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration(
                this.localRecoveryStateRootDirectories = 
checkNotNull(localRecoveryStateRootDirectories);
                this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
                this.networkConfig = checkNotNull(networkConfig);
-               this.queryableStateConfig = checkNotNull(queryableStateConfig);
+               this.queryableStateConfig = queryableStateConfig;
                this.numberOfSlots = checkNotNull(numberOfSlots);
 
                this.configuredMemory = configuredMemory;
@@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final 
Configuration config) {
         * Creates the {@link QueryableStateConfiguration} from the given 
Configuration.
         */
        private static QueryableStateConfiguration 
parseQueryableStateConfiguration(Configuration config) {
+               if 
(!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) 
{
+                       return null;
+               }
 
                final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(
                                
config.getString(QueryableStateOptions.PROXY_PORT_RANGE));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to