zentol commented on a change in pull request #16357:
URL: https://github.com/apache/flink/pull/16357#discussion_r670507592
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java
##########
@@ -91,13 +107,33 @@ public long getLastHeartbeat() {
}
@Override
- public void reportTargetUnreachable() {
- if (state.compareAndSet(State.RUNNING, State.UNREACHABLE)) {
- cancelTimeout();
- heartbeatListener.notifyTargetUnreachable(resourceID);
+ public void reportHeartbeatRpcFailure() {
+ final int failedRpcRequestsSinceLastSuccess =
+ numberFailedRpcRequestsSinceLastSuccess.incrementAndGet();
+
+ if (isHeartbeatRpcFailureDetectionEnabled()
+ && failedRpcRequestsSinceLastSuccess >=
failedRpcRequestsUntilUnreachable) {
+ if (state.compareAndSet(State.RUNNING, State.UNREACHABLE)) {
+ LOG.debug(
+ "Mark heartbeat target {} as unreachable because {}
heartbeat RPCs have failed.",
Review comment:
"...have failed in a row."?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatMonitorImpl.java
##########
@@ -70,6 +81,11 @@
"The heartbeat timeout interval has to be larger than 0.");
this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
+ Preconditions.checkArgument(
+ failedRpcRequestsUntilUnreachable > 0 ||
failedRpcRequestsUntilUnreachable == -1,
+ "The number of failed heartbeat RPC requests has to be larger
than 0 or -1 (deactivated).");
Review comment:
Shouldn't we check this in the HeartbeatServices when we actually access
the configuration?
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/HeartbeatManagerOptions.java
##########
@@ -32,15 +34,40 @@
public static final ConfigOption<Long> HEARTBEAT_INTERVAL =
key("heartbeat.interval")
.defaultValue(10000L)
- .withDescription("Time interval for requesting heartbeat
from sender side.");
+ .withDescription(
+ "Time interval between heartbeat RPC requests from
the sender to the receiver side.");
/** Timeout for requesting and receiving heartbeat for both sender and
receiver sides. */
@Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
public static final ConfigOption<Long> HEARTBEAT_TIMEOUT =
key("heartbeat.timeout")
.defaultValue(50000L)
.withDescription(
- "Timeout for requesting and receiving heartbeat
for both sender and receiver sides.");
+ "Timeout for requesting and receiving heartbeats
for both sender and receiver sides.");
+
+ private static final String HEARTBEAT_RPC_FAILURE_THRESHOLD_KEY =
+ "heartbeat.rpc-failure-threshold";
+
+ @Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
+ public static final ConfigOption<Integer> HEARTBEAT_RPC_FAILURE_THRESHOLD =
+ key(HEARTBEAT_RPC_FAILURE_THRESHOLD_KEY)
+ .intType()
+ .defaultValue(2)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The number of failed heartbeat
RPCs until a heartbeat target is marked as unreachable. "
Review comment:
I think we should mention that N RPCs have to fail in a row for this to
trigger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]