zentol commented on code in PR #22271:
URL: https://github.com/apache/flink/pull/22271#discussion_r1148989474
##########
flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java:
##########
@@ -46,7 +46,7 @@ class MessageSerializationTest {
private static RpcService akkaRpcService1;
private static RpcService akkaRpcService2;
- private static final int maxFrameSize = 32000;
+ private static final int maxFrameSize = 32768;
Review Comment:
Does artery enforce this to be a power of 2?
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
Configuration configuration,
int port,
int externalPort) {
- final Duration akkaAskTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
- final String startupTimeout =
- TimeUtils.getStringInMillis(
- TimeUtils.parseDuration(
- configuration.getString(
- AkkaOptions.STARTUP_TIMEOUT,
- TimeUtils.getStringInMillis(
-
akkaAskTimeout.multipliedBy(10L)))));
-
final String akkaTCPTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
final String akkaFramesize =
configuration.getString(AkkaOptions.FRAMESIZE);
- final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
- final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
- final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
- final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
- final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
- final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
- final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
- final long retryGateClosedFor =
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final String outboundRestartBackoff =
+ configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
akkaConfigBuilder
.add("akka {")
.add(" actor {")
- .add(" provider = \"akka.remote.RemoteActorRefProvider\"")
+ .add(" provider = remote")
.add(" }")
- .add(" remote.artery.enabled = false")
- .add(" remote.startup-timeout = " + startupTimeout)
.add(" remote.warn-about-direct-use = off")
.add(" remote.use-unsafe-remote-features-outside-cluster =
on")
- .add(" remote.classic {")
- .add(" # disable the transport failure detector by setting
very high values")
- .add(" transport-failure-detector{")
- .add(" acceptable-heartbeat-pause = 6000 s")
- .add(" heartbeat-interval = 1000 s")
- .add(" threshold = 300")
+ .add(" remote.artery {")
+ .add(" canonical.port = " + externalPort)
+ .add(" bind.port = " + port)
+ .add(" advanced {")
+ .add(" maximum-frame-size = " + akkaFramesize)
+ .add(" outbound-restart-backoff = " +
outboundRestartBackoff)
+ .add(" tcp.connection-timeout = " + akkaTCPTimeout)
.add(" }")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.tcp\"]")
- .add(" netty {")
- .add(" tcp {")
- .add(" transport-class =
\"akka.remote.transport.netty.NettyTransport\"")
- .add(" port = " + externalPort)
- .add(" bind-port = " + port)
- .add(" connection-timeout = " + akkaTCPTimeout)
- .add(" maximum-frame-size = " + akkaFramesize)
- .add(" tcp-nodelay = on")
- .add(" client-socket-worker-pool {")
- .add(" pool-size-min = " +
clientSocketWorkerPoolPoolSizeMin)
- .add(" pool-size-max = " +
clientSocketWorkerPoolPoolSizeMax)
- .add(" pool-size-factor = " +
clientSocketWorkerPoolPoolSizeFactor)
- .add(" }")
- .add(" server-socket-worker-pool {")
- .add(" pool-size-min = " +
serverSocketWorkerPoolPoolSizeMin)
- .add(" pool-size-max = " +
serverSocketWorkerPoolPoolSizeMax)
- .add(" pool-size-factor = " +
serverSocketWorkerPoolPoolSizeFactor)
- .add(" }")
- .add(" }")
- .add(" }")
- .add(" log-remote-lifecycle-events = " + logLifecycleEvents)
Review Comment:
what happened to this option?
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -362,26 +308,18 @@ private static void addSslRemoteAkkaConfig(
akkaConfigBuilder
.add("akka {")
- .add(" remote.classic {")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.ssl\"]")
- .add(" netty {")
- .add(" ssl = ${akka.remote.classic.netty.tcp}")
- .add(" ssl {")
- .add(" enable-ssl = " + akkaEnableSSL)
- .add(" ssl-engine-provider = " + sslEngineProviderName)
- .add(" security {")
- .add(" key-store = \"" + akkaSSLKeyStore + "\"")
- .add(" key-store-password = \"" +
akkaSSLKeyStorePassword + "\"")
- .add(" key-password = \"" + akkaSSLKeyPassword + "\"")
- .add(" trust-store = \"" + akkaSSLTrustStore + "\"")
- .add(" trust-store-password = \"" +
akkaSSLTrustStorePassword + "\"")
- .add(" protocol = " + akkaSSLProtocol + "")
- .add(" enabled-algorithms = " + akkaSSLAlgorithms +
"")
- .add(" random-number-generator = \"\"")
- .add(" require-mutual-authentication = on")
Review Comment:
Is this now always on or what happened to it?
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceConfiguration.java:
##########
@@ -46,7 +46,7 @@ private AkkaRpcServiceConfiguration(
boolean captureAskCallStack,
boolean forceRpcInvocationSerialization) {
- checkArgument(maximumFramesize > 0L, "Maximum framesize must be
positive.");
+ checkArgument(maximumFramesize >= 32768L, "Maximum framesize must be
at least 32 KiB.");
Review Comment:
new minimum enforced by artery?
##########
flink-rpc/flink-rpc-akka/pom.xml:
##########
@@ -96,11 +96,6 @@ under the License.
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.10.6.Final</version>
Review Comment:
needs a NOTICE update
##########
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java:
##########
@@ -178,13 +170,12 @@ public static boolean
isForceRpcInvocationSerializationEnabled(Configuration con
.defaultValue(true)
.withDescription("Exit JVM on fatal Akka errors.");
- /** Milliseconds a gate should be closed for after a remote connection was
disconnected. */
- public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR =
- ConfigOptions.key("akka.retry-gate-closed-for")
- .longType()
- .defaultValue(50L)
- .withDescription(
- "Milliseconds a gate should be closed for after a
remote connection was disconnected.");
+ /** Retry outbound connection only after this backoff. */
+ public static final ConfigOption<String> OUTBOUND_RESTART_BACKOFF =
Review Comment:
Is this a de-facto replacement `RETRY_GATE_CLOSED_FOR`?
##########
flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaUtils.java:
##########
@@ -206,79 +203,30 @@ private static void addBaseRemoteAkkaConfig(
Configuration configuration,
int port,
int externalPort) {
- final Duration akkaAskTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
-
- final String startupTimeout =
- TimeUtils.getStringInMillis(
- TimeUtils.parseDuration(
- configuration.getString(
- AkkaOptions.STARTUP_TIMEOUT,
- TimeUtils.getStringInMillis(
-
akkaAskTimeout.multipliedBy(10L)))));
-
final String akkaTCPTimeout =
TimeUtils.getStringInMillis(
TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)));
final String akkaFramesize =
configuration.getString(AkkaOptions.FRAMESIZE);
- final int clientSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
- final int clientSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
- final double clientSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
- final int serverSocketWorkerPoolPoolSizeMin =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
- final int serverSocketWorkerPoolPoolSizeMax =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
- final double serverSocketWorkerPoolPoolSizeFactor =
-
configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
-
- final String logLifecycleEvents =
-
booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
-
- final long retryGateClosedFor =
configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR);
+ final String outboundRestartBackoff =
+ configuration.getString(AkkaOptions.OUTBOUND_RESTART_BACKOFF);
akkaConfigBuilder
.add("akka {")
.add(" actor {")
- .add(" provider = \"akka.remote.RemoteActorRefProvider\"")
+ .add(" provider = remote")
.add(" }")
- .add(" remote.artery.enabled = false")
- .add(" remote.startup-timeout = " + startupTimeout)
.add(" remote.warn-about-direct-use = off")
.add(" remote.use-unsafe-remote-features-outside-cluster =
on")
- .add(" remote.classic {")
- .add(" # disable the transport failure detector by setting
very high values")
- .add(" transport-failure-detector{")
- .add(" acceptable-heartbeat-pause = 6000 s")
- .add(" heartbeat-interval = 1000 s")
- .add(" threshold = 300")
+ .add(" remote.artery {")
+ .add(" canonical.port = " + externalPort)
+ .add(" bind.port = " + port)
+ .add(" advanced {")
+ .add(" maximum-frame-size = " + akkaFramesize)
+ .add(" outbound-restart-backoff = " +
outboundRestartBackoff)
+ .add(" tcp.connection-timeout = " + akkaTCPTimeout)
.add(" }")
- .add(" enabled-transports =
[\"akka.remote.classic.netty.tcp\"]")
- .add(" netty {")
- .add(" tcp {")
- .add(" transport-class =
\"akka.remote.transport.netty.NettyTransport\"")
- .add(" port = " + externalPort)
- .add(" bind-port = " + port)
- .add(" connection-timeout = " + akkaTCPTimeout)
- .add(" maximum-frame-size = " + akkaFramesize)
- .add(" tcp-nodelay = on")
- .add(" client-socket-worker-pool {")
Review Comment:
With artery there's no additional thread pool? Do we potentially need to
adjust the akka thread pools to accomodate?
##########
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java:
##########
@@ -235,6 +254,8 @@ public static boolean
isForceRpcInvocationSerializationEnabled(Configuration con
.text("Min number of threads to cap
factor-based number to.")
.build());
+ /** @deprecated Don't use this option anymore. It has no effect on Flink.
*/
+ @Deprecated
Review Comment:
needs regeneration of the docs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]