eaglewatcherwb closed pull request #7412: [FLINK-10866][Runtime] 1. Explicitly 
enable qs server and proxy. 2. QS
URL: https://github.com/apache/flink/pull/7412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/queryable_state_configuration.html 
b/docs/_includes/generated/queryable_state_configuration.html
index c457c40e147..a979658c070 100644
--- a/docs/_includes/generated/queryable_state_configuration.html
+++ b/docs/_includes/generated/queryable_state_configuration.html
@@ -12,6 +12,11 @@
             <td style="word-wrap: break-word;">0</td>
             <td>Number of network (Netty's event loop) Threads for queryable 
state client.</td>
         </tr>
+        <tr>
+            <td><h5>query.enable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Option whether the queryable state proxy and server should be 
enabled where possible and configurable.</td>
+        </tr>
         <tr>
             <td><h5>query.proxy.network-threads</h5></td>
             <td style="word-wrap: break-word;">0</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 6a8041926b7..0411a0eeb16 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -106,6 +106,16 @@
                        .defaultValue(0)
                        .withDescription("Number of query Threads for queryable 
state server. Uses the number of slots if set to 0.");
 
+       /** Option whether the queryable state proxy and server should be 
enabled where possible and configurable.
+        *
+        * <p>Queryable state proxy and server are still more experimental 
features, hence disabled unless they are enable
+        * in user configuration. */
+       public static final ConfigOption<Boolean> 
ENABLE_QUERYABLE_STATE_PROXY_SERVER =
+               key("query.enable")
+                       .defaultValue(false)
+                       .withDescription("Option whether the queryable state 
proxy and server should be enabled where possible" +
+                               " and configurable.");
+
        // 
------------------------------------------------------------------------
        // Client Options
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d124456c213..89e23dac1b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -345,7 +345,7 @@ public void start() throws IOException {
                                } catch (Throwable ie) {
                                        kvStateServer.shutdown();
                                        kvStateServer = null;
-                                       throw new IOException("Failed to start 
the Queryable State Data Server.", ie);
+                                       LOG.error("Failed to start the 
Queryable State Data Server.", ie);
                                }
                        }
 
@@ -355,7 +355,7 @@ public void start() throws IOException {
                                } catch (Throwable ie) {
                                        kvStateProxy.shutdown();
                                        kvStateProxy = null;
-                                       throw new IOException("Failed to start 
the Queryable State Client Proxy.", ie);
+                                       LOG.error("Failed to start the 
Queryable State Client Proxy.", ie);
                                }
                        }
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c46d800bc95..f3feb2dab71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -419,32 +419,38 @@ private static NetworkEnvironment 
createNetworkEnvironment(
 
                QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
 
-               int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
+               KvStateClientProxy kvClientProxy = null;
+               KvStateServer kvStateServer = null;
+
+               if (qsConfig != null) {
+                       int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyServerThreads();
 
-               int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
+                       int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyQueryThreads();
 
-               final KvStateClientProxy kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
+
+                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
                                
taskManagerServicesConfiguration.getTaskManagerAddress(),
                                qsConfig.getProxyPortRange(),
                                numProxyServerNetworkThreads,
                                numProxyServerQueryThreads,
                                new DisabledKvStateRequestStats());
 
-               int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
+                       int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateServerThreads();
 
-               int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
+                       int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
                                
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateQueryThreads();
 
-               final KvStateServer kvStateServer = 
QueryableStateUtils.createKvStateServer(
+                       kvStateServer = QueryableStateUtils.createKvStateServer(
                                
taskManagerServicesConfiguration.getTaskManagerAddress(),
                                qsConfig.getStateServerPortRange(),
                                numStateServerNetworkThreads,
                                numStateServerQueryThreads,
                                kvStateRegistry,
                                new DisabledKvStateRequestStats());
+               }
 
                // we start the network first, to make sure it can allocate its 
buffers first
                return new NetworkEnvironment(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index c27e54d7649..5acbddebfed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -39,6 +39,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -66,6 +67,7 @@
 
        private final NetworkEnvironmentConfiguration networkConfig;
 
+       @Nullable
        private final QueryableStateConfiguration queryableStateConfig;
 
        /**
@@ -93,7 +95,7 @@ public TaskManagerServicesConfiguration(
                        String[] localRecoveryStateRootDirectories,
                        boolean localRecoveryEnabled,
                        NetworkEnvironmentConfiguration networkConfig,
-                       QueryableStateConfiguration queryableStateConfig,
+                       @Nullable QueryableStateConfiguration 
queryableStateConfig,
                        int numberOfSlots,
                        long configuredMemory,
                        MemoryType memoryType,
@@ -107,7 +109,7 @@ public TaskManagerServicesConfiguration(
                this.localRecoveryStateRootDirectories = 
checkNotNull(localRecoveryStateRootDirectories);
                this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
                this.networkConfig = checkNotNull(networkConfig);
-               this.queryableStateConfig = checkNotNull(queryableStateConfig);
+               this.queryableStateConfig = queryableStateConfig;
                this.numberOfSlots = checkNotNull(numberOfSlots);
 
                this.configuredMemory = configuredMemory;
@@ -466,6 +468,9 @@ public static boolean hasNewNetworkBufConf(final 
Configuration config) {
         * Creates the {@link QueryableStateConfiguration} from the given 
Configuration.
         */
        private static QueryableStateConfiguration 
parseQueryableStateConfiguration(Configuration config) {
+               if 
(!config.getBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER)) 
{
+                       return null;
+               }
 
                final Iterator<Integer> proxyPorts = 
NetUtils.getPortRangeFromString(
                                
config.getString(QueryableStateOptions.PROXY_PORT_RANGE));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index 5cd1a50176d..ccbd9e0b590 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -38,6 +38,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.flink.configuration.QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER;
+
 /**
  * Resource which starts a {@link MiniCluster} for testing purposes.
  */
@@ -113,6 +115,7 @@ public void after() {
 
        private void startMiniCluster() throws Exception {
                final Configuration configuration = new 
Configuration(miniClusterResourceConfiguration.getConfiguration());
+               configuration.setBoolean(ENABLE_QUERYABLE_STATE_PROXY_SERVER, 
true);
                configuration.setString(CoreOptions.TMP_DIRS, 
temporaryFolder.newFolder().getAbsolutePath());
 
                // we need to set this since a lot of test expect this because 
TestBaseUtils.startCluster()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to