Repository: flink
Updated Branches:
  refs/heads/release-1.3 471263cfe -> 51818a283


Revert "[FLINK-6518] Port blobserver config parameters to ConfigOptions"

This reverts commit 5e61a01bae6d05bf1d5c76bc48f0ba90bbdef752.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3da6aa83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3da6aa83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3da6aa83

Branch: refs/heads/release-1.3
Commit: 3da6aa8326b0dafc4271e6494ddc968312bd1b50
Parents: 471263c
Author: Stefan Richter <[email protected]>
Authored: Sun May 14 15:36:49 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sun May 14 15:36:49 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/BlobServerOptions.java  | 76 --------------------
 .../flink/configuration/ConfigConstants.java    | 40 +++++------
 .../apache/flink/runtime/blob/BlobCache.java    |  9 +--
 .../apache/flink/runtime/blob/BlobClient.java   |  5 +-
 .../apache/flink/runtime/blob/BlobServer.java   | 22 +++---
 .../flink/runtime/blob/BlobClientSslTest.java   |  5 +-
 .../flink/runtime/blob/BlobServerRangeTest.java |  8 +--
 .../jobmanager/JobManagerStartupTest.java       |  4 +-
 8 files changed, 44 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
deleted file mode 100644
index e27c29f..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.configuration;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-import static org.apache.flink.configuration.ConfigOptions.key;
-
-/**
- * Configuration options for the BlobServer.
- */
-@PublicEvolving
-public class BlobServerOptions {
-
-       /**
-        * The config parameter defining the storage directory to be used by 
the blob server.
-        */
-       public static final ConfigOption<String> STORAGE_DIRECTORY =
-               key("blob.storage.directory")
-                       .noDefaultValue();
-
-       /**
-        * The config parameter defining number of retires for failed BLOB 
fetches.
-        */
-       public static final ConfigOption<Integer> FETCH_RETRIES =
-               key("blob.fetch.retries")
-                       .defaultValue(5);
-
-       /**
-        * The config parameter defining the maximum number of concurrent BLOB 
fetches that the JobManager serves.
-        */
-       public static final ConfigOption<Integer> FETCH_CONCURRENT =
-               key("blob.fetch.num-concurrent")
-                       .defaultValue(50);
-
-       /**
-        * The config parameter defining the backlog of BLOB fetches on the 
JobManager.
-        */
-       public static final ConfigOption<Integer> FETCH_BACKLOG =
-               key("blob.fetch.backlog")
-                       .defaultValue(1000);
-
-       /**
-        * The config parameter defining the server port of the blob service.
-        * The port can either be a port, such as "9123",
-        * a range of ports: "50100-50200"
-        * or a list of ranges and or points: "50100-50200,50300-50400,51234"
-        *
-        * Setting the port to 0 will let the OS choose an available port.
-        */
-       public static final ConfigOption<String> PORT =
-               key("blob.server.port")
-                       .defaultValue("0");
-
-       /**
-        * Flag to override ssl support for the blob service transport.
-        */
-       public static final ConfigOption<Boolean> SSL_ENABLED =
-               key("blob.service.ssl.enabled")
-                       .defaultValue(true);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index b5b5486..c3704be 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -139,39 +139,36 @@ public final class ConfigConstants {
        public static final String RESOURCE_MANAGER_IPC_PORT_KEY = 
"resourcemanager.rpc.port";
 
        /**
-        * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead
+        * The config parameter defining the storage directory to be used by 
the blob server.
         */
-       @Deprecated
        public static final String BLOB_STORAGE_DIRECTORY_KEY = 
"blob.storage.directory";
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
+        * The config parameter defining number of retires for failed BLOB 
fetches.
         */
-       @Deprecated
        public static final String BLOB_FETCH_RETRIES_KEY = 
"blob.fetch.retries";
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
+        * The config parameter defining the maximum number of concurrent BLOB 
fetches that the JobManager serves.
         */
-       @Deprecated
        public static final String BLOB_FETCH_CONCURRENT_KEY = 
"blob.fetch.num-concurrent";
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
+        * The config parameter defining the backlog of BLOB fetches on the 
JobManager
         */
-       @Deprecated
        public static final String BLOB_FETCH_BACKLOG_KEY = 
"blob.fetch.backlog";
 
        /**
-        * @deprecated use {@link BlobServerOptions#PORT} instead
+        * The config parameter defining the server port of the blob service.
+        * The port can either be a port, such as "9123",
+        * a range of ports: "50100-50200"
+        * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+        *
+        * Setting the port to 0 will let the OS choose an available port.
         */
-       @Deprecated
        public static final String BLOB_SERVER_PORT = "blob.server.port";
 
-       /**
-        * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
-        */
-       @Deprecated
+       /** Flag to override ssl support for the blob service transport */
        public static final String BLOB_SERVICE_SSL_ENABLED = 
"blob.service.ssl.enabled";
 
        /**
@@ -1097,33 +1094,28 @@ public final class ConfigConstants {
        public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0;
 
        /**
-        * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead
+        * The default value to override ssl support for blob service transport
         */
-       @Deprecated
        public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true;
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead
+        * Default number of retries for failed BLOB fetches.
         */
-       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_RETRIES = 5;
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead
+        * Default number of concurrent BLOB fetch operations.
         */
-       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50;
 
        /**
-        * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead
+        * Default BLOB fetch connection backlog.
         */
-       @Deprecated
        public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000;
 
        /**
-        * @deprecated use {@link BlobServerOptions#PORT} instead
+        * Default BLOB server port. 0 means ephemeral port.
         */
-       @Deprecated
        public static final String DEFAULT_BLOB_SERVER_PORT = "0";
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 23c7e63..2587b15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
@@ -129,18 +129,19 @@ public final class BlobCache implements BlobService {
                this.blobStore = blobStore;
 
                // configure and create the storage directory
-               String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
+               String storageDirectory = 
blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
                // configure the number of fetch retries
-               final int fetchRetries = 
blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES);
+               final int fetchRetries = blobClientConfig.getInteger(
+                       ConfigConstants.BLOB_FETCH_RETRIES_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES);
                if (fetchRetries >= 0) {
                        this.numFetchRetries = fetchRetries;
                }
                else {
                        LOG.warn("Invalid value for {}. System will attempt no 
retires on failed fetches of BLOBs.",
-                               BlobServerOptions.FETCH_RETRIES.key());
+                               ConfigConstants.BLOB_FETCH_RETRIES_KEY);
                        this.numFetchRetries = 0;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 49e54a1..ea90f54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -92,7 +92,8 @@ public final class BlobClient implements Closeable {
                        // Check if ssl is enabled
                        SSLContext clientSSLContext = null;
                        if (clientConfig != null &&
-                               
clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+                               
clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
+                                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
 
                                clientSSLContext = 
SSLUtils.createSSLClientContext(clientConfig);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 0e15777..8a70559 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,32 +111,34 @@ public class BlobServer extends Thread implements 
BlobService {
                this.blobStore = checkNotNull(blobStore);
 
                // configure and create the storage directory
-               String storageDirectory = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
+               String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
                // configure the maximum number of concurrent connections
-               final int maxConnections = 
config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
+               final int maxConnections = config.getInteger(
+                               ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
                if (maxConnections >= 1) {
                        this.maxConnections = maxConnections;
                }
                else {
                        LOG.warn("Invalid value for maximum connections in BLOB 
server: {}. Using default value of {}",
-                                       maxConnections, 
BlobServerOptions.FETCH_CONCURRENT.defaultValue());
-                       this.maxConnections = 
BlobServerOptions.FETCH_CONCURRENT.defaultValue();
+                                       maxConnections, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
+                       this.maxConnections = 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT;
                }
 
                // configure the backlog of connections
-               int backlog = 
config.getInteger(BlobServerOptions.FETCH_BACKLOG);
+               int backlog = 
config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
                if (backlog < 1) {
                        LOG.warn("Invalid value for BLOB connection backlog: 
{}. Using default value of {}",
-                                       backlog, 
BlobServerOptions.FETCH_BACKLOG.defaultValue());
-                       backlog = 
BlobServerOptions.FETCH_BACKLOG.defaultValue();
+                                       backlog, 
ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG);
+                       backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
                }
 
                this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
-               if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
+               if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED,
+                               
ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) {
                        try {
                                serverSSLContext = 
SSLUtils.createSSLServerContext(config);
                        } catch (Exception e) {
@@ -146,7 +148,7 @@ public class BlobServer extends Thread implements 
BlobService {
 
                //  ----------------------- start the server -------------------
 
-               String serverPortRange = 
config.getString(BlobServerOptions.PORT);
+               String serverPortRange = 
config.getString(ConfigConstants.BLOB_SERVER_PORT, 
ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
 
                Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(serverPortRange);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 27603d0..5054107 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -31,7 +31,6 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
@@ -92,7 +91,7 @@ public class BlobClientSslTest {
                try {
                        Configuration config = new Configuration();
                        config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-                       config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
+                       
config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
                        config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
                        
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
                        
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
@@ -105,7 +104,7 @@ public class BlobClientSslTest {
 
                clientConfig = new Configuration();
                clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-               clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
+               
clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false);
                clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
                
clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, 
"password");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index c3762aa..ea0eb94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
@@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger {
        @Test
        public void testOnEphemeralPort() throws IOException {
                Configuration conf = new Configuration();
-               conf.setString(BlobServerOptions.PORT, "0");
+               conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0");
                BlobServer srv = new BlobServer(conf);
                srv.shutdown();
        }
@@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger {
                }
 
                Configuration conf = new Configuration();
-               conf.setString(BlobServerOptions.PORT, 
String.valueOf(socket.getLocalPort()));
+               conf.setString(ConfigConstants.BLOB_SERVER_PORT, 
String.valueOf(socket.getLocalPort()));
 
                // this thing is going to throw an exception
                try {
@@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger {
                }
                int availablePort = NetUtils.getAvailablePort();
                Configuration conf = new Configuration();
-               conf.setString(BlobServerOptions.PORT, 
sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + 
availablePort);
+               conf.setString(ConfigConstants.BLOB_SERVER_PORT, 
sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + 
availablePort);
 
                // this thing is going to throw an exception
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index a906d9b..9ac6873 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -30,7 +30,7 @@ import java.util.List;
 
 import com.google.common.io.Files;
 
-import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
@@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger {
                }
                Configuration failConfig = new Configuration();
                String nonExistDirectory = new File(blobStorageDirectory, 
DOES_NOT_EXISTS_NO_SIR).getAbsolutePath();
-               failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, 
nonExistDirectory);
+               
failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, 
nonExistDirectory);
 
                try {
                        JobManager.runJobManager(failConfig, 
JobManagerMode.CLUSTER, "localhost", portNum);

Reply via email to