Repository: flink Updated Branches: refs/heads/release-1.3 471263cfe -> 51818a283
Revert "[FLINK-6518] Port blobserver config parameters to ConfigOptions" This reverts commit 5e61a01bae6d05bf1d5c76bc48f0ba90bbdef752. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3da6aa83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3da6aa83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3da6aa83 Branch: refs/heads/release-1.3 Commit: 3da6aa8326b0dafc4271e6494ddc968312bd1b50 Parents: 471263c Author: Stefan Richter <[email protected]> Authored: Sun May 14 15:36:49 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sun May 14 15:36:49 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/BlobServerOptions.java | 76 -------------------- .../flink/configuration/ConfigConstants.java | 40 +++++------ .../apache/flink/runtime/blob/BlobCache.java | 9 +-- .../apache/flink/runtime/blob/BlobClient.java | 5 +- .../apache/flink/runtime/blob/BlobServer.java | 22 +++--- .../flink/runtime/blob/BlobClientSslTest.java | 5 +- .../flink/runtime/blob/BlobServerRangeTest.java | 8 +-- .../jobmanager/JobManagerStartupTest.java | 4 +- 8 files changed, 44 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java deleted file mode 100644 index e27c29f..0000000 --- a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.configuration; - -import org.apache.flink.annotation.PublicEvolving; - -import static org.apache.flink.configuration.ConfigOptions.key; - -/** - * Configuration options for the BlobServer. - */ -@PublicEvolving -public class BlobServerOptions { - - /** - * The config parameter defining the storage directory to be used by the blob server. - */ - public static final ConfigOption<String> STORAGE_DIRECTORY = - key("blob.storage.directory") - .noDefaultValue(); - - /** - * The config parameter defining number of retires for failed BLOB fetches. - */ - public static final ConfigOption<Integer> FETCH_RETRIES = - key("blob.fetch.retries") - .defaultValue(5); - - /** - * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. - */ - public static final ConfigOption<Integer> FETCH_CONCURRENT = - key("blob.fetch.num-concurrent") - .defaultValue(50); - - /** - * The config parameter defining the backlog of BLOB fetches on the JobManager. - */ - public static final ConfigOption<Integer> FETCH_BACKLOG = - key("blob.fetch.backlog") - .defaultValue(1000); - - /** - * The config parameter defining the server port of the blob service. - * The port can either be a port, such as "9123", - * a range of ports: "50100-50200" - * or a list of ranges and or points: "50100-50200,50300-50400,51234" - * - * Setting the port to 0 will let the OS choose an available port. - */ - public static final ConfigOption<String> PORT = - key("blob.server.port") - .defaultValue("0"); - - /** - * Flag to override ssl support for the blob service transport. - */ - public static final ConfigOption<Boolean> SSL_ENABLED = - key("blob.service.ssl.enabled") - .defaultValue(true); -} http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b5b5486..c3704be 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -139,39 +139,36 @@ public final class ConfigConstants { public static final String RESOURCE_MANAGER_IPC_PORT_KEY = "resourcemanager.rpc.port"; /** - * @deprecated use {@link BlobServerOptions#STORAGE_DIRECTORY} instead + * The config parameter defining the storage directory to be used by the blob server. */ - @Deprecated public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory"; /** - * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead + * The config parameter defining number of retires for failed BLOB fetches. */ - @Deprecated public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries"; /** - * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead + * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. */ - @Deprecated public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent"; /** - * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead + * The config parameter defining the backlog of BLOB fetches on the JobManager */ - @Deprecated public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog"; /** - * @deprecated use {@link BlobServerOptions#PORT} instead + * The config parameter defining the server port of the blob service. + * The port can either be a port, such as "9123", + * a range of ports: "50100-50200" + * or a list of ranges and or points: "50100-50200,50300-50400,51234" + * + * Setting the port to 0 will let the OS choose an available port. */ - @Deprecated public static final String BLOB_SERVER_PORT = "blob.server.port"; - /** - * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead - */ - @Deprecated + /** Flag to override ssl support for the blob service transport */ public static final String BLOB_SERVICE_SSL_ENABLED = "blob.service.ssl.enabled"; /** @@ -1097,33 +1094,28 @@ public final class ConfigConstants { public static final int DEFAULT_RESOURCE_MANAGER_IPC_PORT = 0; /** - * @deprecated use {@link BlobServerOptions#SSL_ENABLED} instead + * The default value to override ssl support for blob service transport */ - @Deprecated public static final boolean DEFAULT_BLOB_SERVICE_SSL_ENABLED = true; /** - * @deprecated use {@link BlobServerOptions#FETCH_RETRIES} instead + * Default number of retries for failed BLOB fetches. */ - @Deprecated public static final int DEFAULT_BLOB_FETCH_RETRIES = 5; /** - * @deprecated use {@link BlobServerOptions#FETCH_CONCURRENT} instead + * Default number of concurrent BLOB fetch operations. */ - @Deprecated public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50; /** - * @deprecated use {@link BlobServerOptions#FETCH_BACKLOG} instead + * Default BLOB fetch connection backlog. */ - @Deprecated public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000; /** - * @deprecated use {@link BlobServerOptions#PORT} instead + * Default BLOB server port. 0 means ephemeral port. */ - @Deprecated public static final String DEFAULT_BLOB_SERVER_PORT = "0"; /** http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 23c7e63..2587b15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.FileUtils; @@ -129,18 +129,19 @@ public final class BlobCache implements BlobService { this.blobStore = blobStore; // configure and create the storage directory - String storageDirectory = blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY); + String storageDirectory = blobClientConfig.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB cache storage directory " + storageDir); // configure the number of fetch retries - final int fetchRetries = blobClientConfig.getInteger(BlobServerOptions.FETCH_RETRIES); + final int fetchRetries = blobClientConfig.getInteger( + ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); if (fetchRetries >= 0) { this.numFetchRetries = fetchRetries; } else { LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", - BlobServerOptions.FETCH_RETRIES.key()); + ConfigConstants.BLOB_FETCH_RETRIES_KEY); this.numFetchRetries = 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 49e54a1..ea90f54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; @@ -92,7 +92,8 @@ public final class BlobClient implements Closeable { // Check if ssl is enabled SSLContext clientSSLContext = null; if (clientConfig != null && - clientConfig.getBoolean(BlobServerOptions.SSL_ENABLED)) { + clientConfig.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, + ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) { clientSSLContext = SSLUtils.createSSLClientContext(clientConfig); } http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 0e15777..8a70559 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; @@ -111,32 +111,34 @@ public class BlobServer extends Thread implements BlobService { this.blobStore = checkNotNull(blobStore); // configure and create the storage directory - String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY); + String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); // configure the maximum number of concurrent connections - final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); + final int maxConnections = config.getInteger( + ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); if (maxConnections >= 1) { this.maxConnections = maxConnections; } else { LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", - maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue()); - this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); + maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); + this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT; } // configure the backlog of connections - int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); + int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); if (backlog < 1) { LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", - backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue()); - backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); + backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); + backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; } this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); - if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) { + if (config.getBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, + ConfigConstants.DEFAULT_BLOB_SERVICE_SSL_ENABLED)) { try { serverSSLContext = SSLUtils.createSSLServerContext(config); } catch (Exception e) { @@ -146,7 +148,7 @@ public class BlobServer extends Thread implements BlobService { // ----------------------- start the server ------------------- - String serverPortRange = config.getString(BlobServerOptions.PORT); + String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT); Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange); http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 27603d0..5054107 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -31,7 +31,6 @@ import java.security.MessageDigest; import java.util.Collections; import java.util.List; -import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; @@ -92,7 +91,7 @@ public class BlobClientSslTest { try { Configuration config = new Configuration(); config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(BlobServerOptions.SSL_ENABLED, false); + config.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); @@ -105,7 +104,7 @@ public class BlobClientSslTest { clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); + clientConfig.setBoolean(ConfigConstants.BLOB_SERVICE_SSL_ENABLED, false); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); clientConfig.setString(ConfigConstants.SECURITY_SSL_TRUSTSTORE_PASSWORD, "password"); } http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index c3762aa..ea0eb94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -38,7 +38,7 @@ public class BlobServerRangeTest extends TestLogger { @Test public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); - conf.setString(BlobServerOptions.PORT, "0"); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, "0"); BlobServer srv = new BlobServer(conf); srv.shutdown(); } @@ -59,7 +59,7 @@ public class BlobServerRangeTest extends TestLogger { } Configuration conf = new Configuration(); - conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort())); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, String.valueOf(socket.getLocalPort())); // this thing is going to throw an exception try { @@ -88,7 +88,7 @@ public class BlobServerRangeTest extends TestLogger { } int availablePort = NetUtils.getAvailablePort(); Configuration conf = new Configuration(); - conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); + conf.setString(ConfigConstants.BLOB_SERVER_PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); // this thing is going to throw an exception try { http://git-wip-us.apache.org/repos/asf/flink/blob/3da6aa83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java index a906d9b..9ac6873 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java @@ -30,7 +30,7 @@ import java.util.List; import com.google.common.io.Files; -import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.StartupUtils; import org.apache.flink.util.NetUtils; @@ -130,7 +130,7 @@ public class JobManagerStartupTest extends TestLogger { } Configuration failConfig = new Configuration(); String nonExistDirectory = new File(blobStorageDirectory, DOES_NOT_EXISTS_NO_SIR).getAbsolutePath(); - failConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, nonExistDirectory); + failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, nonExistDirectory); try { JobManager.runJobManager(failConfig, JobManagerMode.CLUSTER, "localhost", portNum);
