[FLINK-8843][REST] Decouple bind REST address from advertised address By default bind REST server on wildcard address. Rename RestServerEndpoint#getRestAddress to getRestBaseUrl.
This closes #5707. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efd7336f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efd7336f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efd7336f Branch: refs/heads/master Commit: efd7336fa693a9f82b9ecfb5d81c0ef747ab7801 Parents: 0caff35 Author: gyao <[email protected]> Authored: Thu Mar 15 22:04:58 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Mar 18 15:58:14 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/configuration/RestOptions.java | 15 +++++-- .../runtime/entrypoint/ClusterEntrypoint.java | 4 +- .../HighAvailabilityServicesUtils.java | 6 ++- .../flink/runtime/minicluster/MiniCluster.java | 4 +- .../minicluster/MiniClusterConfiguration.java | 4 ++ .../flink/runtime/rest/RestServerEndpoint.java | 46 +++++++++++--------- .../rest/RestServerEndpointConfiguration.java | 32 ++++++++++---- .../runtime/webmonitor/WebMonitorEndpoint.java | 6 +-- .../runtime/rest/RestServerEndpointITCase.java | 3 +- 9 files changed, 80 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java index 94d7977..e7421c4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java @@ -29,12 +29,21 @@ import static org.apache.flink.configuration.ConfigOptions.key; public class RestOptions { /** - * The address that the server binds itself to / the client connects to. + * The address that the server binds itself to. + */ + public static final ConfigOption<String> REST_BIND_ADDRESS = + key("rest.bind-address") + .noDefaultValue() + .withDescription("The address that the server binds itself."); + + /** + * The address that should be used by clients to connect to the server. */ public static final ConfigOption<String> REST_ADDRESS = key("rest.address") - .defaultValue("localhost") - .withDescription("The address that the server binds itself to / the client connects to."); + .noDefaultValue() + .withDeprecatedKeys(JobManagerOptions.ADDRESS.key()) + .withDescription("The address that should be used by clients to connect to the server."); /** * The port that the server listens on / the client connects to. http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 676415b..63c8072 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -330,7 +330,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { metricRegistry, this, clusterInformation, - webMonitorEndpoint.getRestAddress()); + webMonitorEndpoint.getRestBaseUrl()); jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress()); @@ -345,7 +345,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler { metricRegistry.getMetricQueryServicePath(), archivedExecutionGraphStore, this, - webMonitorEndpoint.getRestAddress()); + webMonitorEndpoint.getRestBaseUrl()); LOG.debug("Starting ResourceManager."); resourceManager.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java index 4f12f2b..f19a421 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -39,6 +39,8 @@ import org.apache.flink.util.ConfigurationException; import java.util.concurrent.Executor; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utils class to instantiate {@link HighAvailabilityServices} implementations. */ @@ -97,7 +99,9 @@ public class HighAvailabilityServicesUtils { addressResolution, configuration); - final String address = configuration.getString(RestOptions.REST_ADDRESS); + final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS), + "%s must be set", + RestOptions.REST_ADDRESS.key()); final int port = configuration.getInteger(RestOptions.REST_PORT); final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED); final String protocol = enableSSL ? "https://" : "http://"; http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 74aa388..dfe30af 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -341,7 +341,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { dispatcherRestEndpoint.start(); - restAddressURI = new URI(dispatcherRestEndpoint.getRestAddress()); + restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); @@ -361,7 +361,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), - dispatcherRestEndpoint.getRestAddress()); + dispatcherRestEndpoint.getRestBaseUrl()); dispatcher.start(); http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 08af0c4..fe76694 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.util.Preconditions; @@ -167,6 +168,9 @@ public class MiniClusterConfiguration { public MiniClusterConfiguration build() { final Configuration modifiedConfiguration = new Configuration(configuration); modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + modifiedConfiguration.setString( + RestOptions.REST_ADDRESS, + modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost")); return new MiniClusterConfiguration( modifiedConfiguration, http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index a3d4843..dfb01ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -73,8 +73,9 @@ public abstract class RestServerEndpoint { private final Object lock = new Object(); - private final String configuredAddress; - private final int configuredPort; + private final String restAddress; + private final String restBindAddress; + private final int restBindPort; private final SSLEngine sslEngine; private final Path uploadDir; private final int maxContentLength; @@ -84,14 +85,16 @@ public abstract class RestServerEndpoint { private ServerBootstrap bootstrap; private Channel serverChannel; - private String restAddress; + private String restBaseUrl; private State state = State.CREATED; public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException { Preconditions.checkNotNull(configuration); - this.configuredAddress = configuration.getEndpointBindAddress(); - this.configuredPort = configuration.getEndpointBindPort(); + + this.restAddress = configuration.getRestAddress(); + this.restBindAddress = configuration.getRestBindAddress(); + this.restBindPort = configuration.getRestBindPort(); this.sslEngine = configuration.getSslEngine(); this.uploadDir = configuration.getUploadDir(); @@ -101,8 +104,6 @@ public abstract class RestServerEndpoint { this.responseHeaders = configuration.getResponseHeaders(); terminationFuture = new CompletableFuture<>(); - - this.restAddress = null; } /** @@ -176,18 +177,23 @@ public abstract class RestServerEndpoint { .childHandler(initializer); final ChannelFuture channel; - if (configuredAddress == null) { - channel = bootstrap.bind(configuredPort); + if (restBindAddress == null) { + channel = bootstrap.bind(restBindPort); } else { - channel = bootstrap.bind(configuredAddress, configuredPort); + channel = bootstrap.bind(restBindAddress, restBindPort); } serverChannel = channel.syncUninterruptibly().channel(); - InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); - String address = bindAddress.getAddress().getHostAddress(); - int port = bindAddress.getPort(); + final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); + final String advertisedAddress; + if (bindAddress.getAddress().isAnyLocalAddress()) { + advertisedAddress = this.restAddress; + } else { + advertisedAddress = bindAddress.getAddress().getHostAddress(); + } + final int port = bindAddress.getPort(); - log.info("Rest endpoint listening at {}:{}", address, port); + log.info("Rest endpoint listening at {}:{}", advertisedAddress, port); final String protocol; @@ -197,9 +203,9 @@ public abstract class RestServerEndpoint { protocol = "http://"; } - restAddress = protocol + address + ':' + port; + restBaseUrl = protocol + advertisedAddress + ':' + port; - restAddressFuture.complete(restAddress); + restAddressFuture.complete(restBaseUrl); state = State.RUNNING; @@ -238,14 +244,14 @@ public abstract class RestServerEndpoint { } /** - * Returns the address of the REST server endpoint. + * Returns the base URL of the REST server endpoint. * - * @return REST address of this endpoint + * @return REST base URL of this endpoint */ - public String getRestAddress() { + public String getRestBaseUrl() { synchronized (lock) { Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been started yet."); - return restAddress; + return restBaseUrl; } } http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java index 35bd6ea..1fac08e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java @@ -45,6 +45,8 @@ import static java.util.Objects.requireNonNull; */ public final class RestServerEndpointConfiguration { + private final String restAddress; + @Nullable private final String restBindAddress; @@ -60,6 +62,7 @@ public final class RestServerEndpointConfiguration { private final Map<String, String> responseHeaders; private RestServerEndpointConfiguration( + final String restAddress, @Nullable String restBindAddress, int restBindPort, @Nullable SSLEngine sslEngine, @@ -69,12 +72,20 @@ public final class RestServerEndpointConfiguration { Preconditions.checkArgument(0 <= restBindPort && restBindPort < 65536, "The bing rest port " + restBindPort + " is out of range (0, 65536["); Preconditions.checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength); + this.restAddress = requireNonNull(restAddress); this.restBindAddress = restBindAddress; this.restBindPort = restBindPort; this.sslEngine = sslEngine; this.uploadDir = requireNonNull(uploadDir); this.maxContentLength = maxContentLength; - this.responseHeaders = requireNonNull(Collections.unmodifiableMap(responseHeaders)); + this.responseHeaders = Collections.unmodifiableMap(requireNonNull(responseHeaders)); + } + + /** + * @see RestOptions#REST_ADDRESS + */ + public String getRestAddress() { + return restAddress; } /** @@ -82,7 +93,7 @@ public final class RestServerEndpointConfiguration { * * @return address that the REST server endpoint should bind itself to */ - public String getEndpointBindAddress() { + public String getRestBindAddress() { return restBindAddress; } @@ -91,7 +102,7 @@ public final class RestServerEndpointConfiguration { * * @return port that the REST server endpoint should listen on */ - public int getEndpointBindPort() { + public int getRestBindPort() { return restBindPort; } @@ -136,12 +147,16 @@ public final class RestServerEndpointConfiguration { */ public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException { Preconditions.checkNotNull(config); - String address = config.getString(RestOptions.REST_ADDRESS); - int port = config.getInteger(RestOptions.REST_PORT); + final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS), + "%s must be set", + RestOptions.REST_ADDRESS.key()); + + final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS); + final int port = config.getInteger(RestOptions.REST_PORT); SSLEngine sslEngine = null; - boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED); + final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED); if (enableSSL) { try { SSLContext sslContext = SSLUtils.createSSLServerContext(config); @@ -159,14 +174,15 @@ public final class RestServerEndpointConfiguration { config.getString(WebOptions.UPLOAD_DIR, config.getString(WebOptions.TMP_DIR)), "flink-web-upload-" + UUID.randomUUID()); - int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH); + final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH); final Map<String, String> responseHeaders = Collections.singletonMap( HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, config.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN)); return new RestServerEndpointConfiguration( - address, + restAddress, + restBindAddress, port, sslEngine, uploadDir, http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index dfb2fc8..50ad7eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -666,18 +666,18 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp @Override public void grantLeadership(final UUID leaderSessionID) { - log.info("{} was granted leadership with leaderSessionID={}", getRestAddress(), leaderSessionID); + log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID); leaderElectionService.confirmLeaderSessionID(leaderSessionID); } @Override public void revokeLeadership() { - log.info("{} lost leadership", getRestAddress()); + log.info("{} lost leadership", getRestBaseUrl()); } @Override public String getAddress() { - return getRestAddress(); + return getRestBaseUrl(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/efd7336f/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 32f3ec8..784c141 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -115,6 +115,7 @@ public class RestServerEndpointITCase extends TestLogger { public void setup() throws Exception { Configuration config = new Configuration(); config.setInteger(RestOptions.REST_PORT, 0); + config.setString(RestOptions.REST_ADDRESS, "localhost"); config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath()); config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH); @@ -335,7 +336,7 @@ public class RestServerEndpointITCase extends TestLogger { private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException { final HttpURLConnection connection = - (HttpURLConnection) new URL(serverEndpoint.getRestAddress() + "/upload").openConnection(); + (HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection(); connection.setDoOutput(true); connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary); return connection;
