[
https://issues.apache.org/jira/browse/FLINK-7788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201697#comment-16201697
]
ASF GitHub Bot commented on FLINK-7788:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4814#discussion_r144237816
--- Diff:
flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
---
@@ -96,90 +113,158 @@ protected AbstractServerBase(
final Integer bindPort,
final Integer numEventLoopThreads,
final Integer numQueryThreads) {
+ this(
+ serverName,
+ bindAddress,
+ Collections.singleton(bindPort).iterator(),
+ numEventLoopThreads,
+ numQueryThreads
+ );
+ }
+
+ /**
+ * Creates the {@link AbstractServerBase}.
+ *
+ * <p>The server needs to be started via {@link #start()}.
+ *
+ * @param serverName the name of the server
+ * @param bindAddress address to bind to
+ * @param bindPortIterator port to bind to
+ * @param numEventLoopThreads number of event loop threads
+ */
+ protected AbstractServerBase(
+ final String serverName,
+ final InetAddress bindAddress,
+ final Iterator<Integer> bindPortIterator,
+ final Integer numEventLoopThreads,
+ final Integer numQueryThreads) {
- Preconditions.checkNotNull(bindAddress);
- Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536,
"Port " + bindPort + " out of valid range (0-65536).");
+ Preconditions.checkNotNull(bindPortIterator);
Preconditions.checkArgument(numEventLoopThreads >= 1,
"Non-positive number of event loop threads.");
Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive
number of query threads.");
this.serverName = Preconditions.checkNotNull(serverName);
- this.queryExecutor = createQueryExecutor(numQueryThreads);
-
- final NettyBufferPool bufferPool = new
NettyBufferPool(numEventLoopThreads);
-
- final ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat("Flink " + serverName + "
EventLoop Thread %d")
- .build();
-
- final NioEventLoopGroup nioGroup = new
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
- bootstrap = new ServerBootstrap()
- // Bind address and port
- .localAddress(bindAddress, bindPort)
- // NIO server channels
- .group(nioGroup)
- .channel(NioServerSocketChannel.class)
- // AbstractServerBase channel Options
- .option(ChannelOption.ALLOCATOR, bufferPool)
- // Child channel options
- .childOption(ChannelOption.ALLOCATOR,
bufferPool)
-
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK);
+ this.bindAddress = Preconditions.checkNotNull(bindAddress);
+ this.numEventLoopThreads = numEventLoopThreads;
+ this.numQueryThreads = numQueryThreads;
+
+ this.bindPortRange = new HashSet<>();
+ while (bindPortIterator.hasNext()) {
+ int port = bindPortIterator.next();
+ Preconditions.checkArgument(port >= 0 && port <= 65535,
+ "Invalid port configuration. Port must
be between 0 and 65535, but was " + port + ".");
+ bindPortRange.add(port);
+ }
}
/**
* Creates a thread pool for the query execution.
- *
- * @param numQueryThreads Number of query threads.
* @return Thread pool for query execution
*/
- private ExecutorService createQueryExecutor(int numQueryThreads) {
+ private ExecutorService createQueryExecutor() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Flink " + getServerName() + "
Thread %d")
.build();
-
return Executors.newFixedThreadPool(numQueryThreads,
threadFactory);
}
+ /**
+ * Returns the thread-pool responsible for processing incoming requests.
+ */
protected ExecutorService getQueryExecutor() {
return queryExecutor;
}
+ /**
+ * Gets the name of the server. This is useful for debugging.
+ * @return The name of the server.
+ */
public String getServerName() {
return serverName;
}
+ /**
+ * Returns the {@link AbstractServerHandler handler} to be used for
+ * serving the incoming requests.
+ */
public abstract AbstractServerHandler<REQ, RESP> initializeHandler();
/**
+ * Returns the address of this server.
+ *
+ * @return AbstractServerBase address
+ * @throws IllegalStateException If server has not been started yet
+ */
+ public KvStateServerAddress getServerAddress() {
+ Preconditions.checkState(serverAddress != null, "Server " +
serverName + " has not been started.");
+ return serverAddress;
+ }
+
+ /**
* Starts the server by binding to the configured bind address
(blocking).
* @throws InterruptedException If interrupted during the bind operation
*/
public void start() throws InterruptedException {
Preconditions.checkState(serverAddress == null,
- "Server " + serverName + " has already been
started @ " + serverAddress + '.');
-
- this.handler = initializeHandler();
- bootstrap.childHandler(new ServerChannelInitializer<>(handler));
-
- Channel channel = bootstrap.bind().sync().channel();
- InetSocketAddress localAddress = (InetSocketAddress)
channel.localAddress();
- serverAddress = new
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+ "Server " + serverName + " already running @ "
+ serverAddress + '.');
+
+ Iterator<Integer> portIterator = bindPortRange.iterator();
+ while (portIterator.hasNext() && serverAddress == null) {
+ final int port = portIterator.next();
+ try {
+ attemptToBind(port);
+ } catch (Exception e) {
--- End diff --
Is there a more specific exception that will be thrown if the port is
taken? We could catch specifically that one in `attemptToBind()` and return
true or false depending on whether binding was succesful.
> Allow port range for queryable state client proxy.
> --------------------------------------------------
>
> Key: FLINK-7788
> URL: https://issues.apache.org/jira/browse/FLINK-7788
> Project: Flink
> Issue Type: Sub-task
> Components: Queryable State
> Affects Versions: 1.4.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> Currently the newly introduced queryable state client proxy can only take one
> port as a parameter to bind to. In case of multiple proxies running on one
> machine, this can result in port clashes and inability to start the
> corresponding proxies.
> This issue proposes to allow the specification of a port range, so that if
> some ports in the range are occupied, the proxy can still pick from the
> remaining free ones.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)