Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2410#discussion_r76046632
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java
 ---
    @@ -0,0 +1,286 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.heartbeat;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +import org.apache.flink.runtime.rpc.RpcGateway;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.slf4j.Logger;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.Serializable;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +/**
    + * This utility class implements the basis of trigger heartbeat from one 
component to another component periodically,
    + * for example trigger heartbeat from the ResourceManager to TaskExecutor.
    + *
    + * @param <Gateway> The type of the gateway to connect to.
    + * @param <Payload> The type of the successful heartbeat responses with 
payload.
    + */
    +public abstract class HeartbeatScheduler<Gateway extends RpcGateway, 
Payload extends Serializable> {
    +   /** default heartbeat interval time in millisecond */
    +   private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
    +
    +   /** default heartbeat timeout in millisecond */
    +   private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
    +
    +   /** default max heartbeat interval time in millisecond (which is used 
in retry heartbeat case) */
    +   private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
    +
    +   /** default heartbeat attempt delay after an exception has occurred */
    +   private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
    +
    +   /** default max heartbeat retry time for one heartbeat */
    +   private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 60000;
    +
    +   private final long heartbeatInterval;
    +
    +   private final long heartbeatTimeout;
    +
    +   private final long maxHeartbeatTimeout;
    +
    +   private final long delayOnError;
    +
    +   private final long maxAttemptTime;
    +
    +   /** target gateway to receive the heartbeat and give heartbeatResponse 
*/
    +   protected final Gateway targetGateway;
    +
    +   /** the target address */
    +   private final String targetAddress;
    +
    +   /** the target gateway name */
    +   private final String targetName;
    +
    +   private final RpcService rpcService;
    +
    +   private final UUID leaderID;
    +
    +   private final Logger log;
    +
    +   private volatile boolean closed;
    +
    +   /**
    +    * @param rpcService    rpcService
    +    * @param leaderID      leader session id of current source end which 
send heartbeat
    +    * @param targetGateway target gateway which receive heartbeat and 
response
    +    * @param targetAddress target gateway address
    +    * @param targetName    target name
    +    * @param log           log
    +    */
    +   public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway 
targetGateway,
    +           String targetAddress, String targetName, Logger log) {
    +           this(rpcService, leaderID, targetGateway, targetAddress, 
targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS,
    +                   INITIAL_HEARTBEAT_TIMEOUT_MILLIS, 
MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS, 
MAX_HEARTBEAT_ATTEMPT_MILLIS);
    +   }
    +
    +   /**
    +    * @param rpcService          rpcService
    +    * @param leaderID            leader session id of current source end 
which send heartbeat
    +    * @param targetGateway       target gateway which receive heartbeat 
and response
    +    * @param targetAddress       target gateway address
    +    * @param targetName          target name
    +    * @param log                 log
    +    * @param heartbeatInterval   heartbeat interval time in millisecond
    +    * @param heartbeatTimeout    heartbeat timeout in millisecond
    +    * @param maxHeartbeatTimeout max heartbeat interval time in millisecond
    +    * @param delayOnError        Heartbeat attempt delay after an 
exception has occurred
    +    * @param maxAttemptTime      max retry time for one heartbeat
    +    */
    +   public HeartbeatScheduler(
    +           RpcService rpcService, UUID leaderID, Gateway targetGateway,
    +           String targetAddress, String targetName, Logger log, long 
heartbeatInterval,
    +           long heartbeatTimeout, long maxHeartbeatTimeout, long 
delayOnError, long maxAttemptTime) {
    +           checkArgument(heartbeatInterval > 0, "initial heartbeat 
interval must be greater than zero");
    +           checkArgument(heartbeatTimeout > 0, "initial heartbeat timeout 
must be greater than zero");
    +           checkArgument(maxHeartbeatTimeout > 0, "maximum heartbeat 
timeout must be greater than zero");
    +           checkArgument(delayOnError >= 0, "delay on error must be 
non-negative");
    +           checkArgument(maxAttemptTime >= 0, "max attempt on error must 
be non-negative");
    +           this.rpcService = checkNotNull(rpcService);
    +           this.leaderID = checkNotNull(leaderID);
    +           this.targetGateway = checkNotNull(targetGateway);
    +           this.targetAddress = checkNotNull(targetAddress);
    +           this.targetName = checkNotNull(targetName);
    +           this.log = checkNotNull(log);
    +           this.heartbeatInterval = heartbeatInterval;
    +           this.heartbeatTimeout = heartbeatTimeout;
    +           this.maxHeartbeatTimeout = maxHeartbeatTimeout;
    +           this.delayOnError = delayOnError;
    +           this.maxAttemptTime = maxAttemptTime;
    +   }
    +
    +   /**
    +    * start to schedule heartbeat
    --- End diff --
    
    Usually comments start with a capital letter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to