Updated Branches:
  refs/heads/trunk 23bfd9ed5 -> 960d03d8a

FLUME-1488. Load Balancing RPC client should support exponential backoff of 
failed nodes.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/960d03d8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/960d03d8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/960d03d8

Branch: refs/heads/trunk
Commit: 960d03d8a5d3b56db15dd390435a3b359f3ac4fb
Parents: 23bfd9e
Author: Mike Percy <[email protected]>
Authored: Wed Sep 12 16:24:40 2012 -0700
Committer: Mike Percy <[email protected]>
Committed: Wed Sep 12 16:24:40 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/AbstractSinkSelector.java    |    8 +-
 .../flume/sink/LoadBalancingSinkProcessor.java     |  196 +++-----------
 .../flume/sink/TestLoadBalancingSinkProcessor.java |   33 ++-
 flume-ng-sdk/pom.xml                               |    5 +-
 .../apache/flume/api/LoadBalancingRpcClient.java   |   99 ++++----
 .../flume/api/RpcClientConfigurationConstants.java |    4 +
 .../java/org/apache/flume/util/OrderSelector.java  |  152 +++++++++++
 .../org/apache/flume/util/RandomOrderSelector.java |   52 ++++
 .../apache/flume/util/RoundRobinOrderSelector.java |   57 ++++
 .../flume/api/TestLoadBalancingRpcClient.java      |  199 +++++++++++++++
 10 files changed, 587 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
index 3e806a7..9ddeef4 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
@@ -33,10 +33,14 @@ public abstract class AbstractSinkSelector implements 
SinkSelector {
   // List of sinks as specified
   private List<Sink> sinkList;
 
+  protected long maxTimeOut = 0;
+
   @Override
   public void configure(Context context) {
-    // no-op configure method for convenience for implementations
-    // that do not require configuration.
+    Long timeOut = context.getLong("maxTimeOut");
+    if(timeOut != null){
+      maxTimeOut = timeOut;
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
 
b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
index 93a46a0..2d85756 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
@@ -18,12 +18,8 @@
  */
 package org.apache.flume.sink;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
 
 import org.apache.flume.Context;
 import org.apache.flume.EventDeliveryException;
@@ -32,11 +28,13 @@ import org.apache.flume.Sink;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.lifecycle.LifecycleAware;
-import org.apache.flume.util.SpecificOrderIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import org.apache.flume.util.OrderSelector;
+import org.apache.flume.util.RandomOrderSelector;
+import org.apache.flume.util.RoundRobinOrderSelector;
 
 /**
  * <p>Provides the ability to load-balance flow over multiple sinks.</p>
@@ -82,6 +80,7 @@ import com.google.common.base.Preconditions;
 public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
   public static final String CONFIG_SELECTOR = "selector";
   public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
+  public static final String CONFIG_BACKOFF = "backoff";
 
   public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
   public static final String SELECTOR_NAME_RANDOM = "RANDOM";
@@ -102,16 +101,14 @@ public class LoadBalancingSinkProcessor extends 
AbstractSinkProcessor {
     String selectorTypeName = context.getString(CONFIG_SELECTOR,
         SELECTOR_NAME_ROUND_ROBIN);
 
+    Boolean shouldBackOff = context.getBoolean(CONFIG_BACKOFF, false);
+
     selector = null;
 
     if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) {
-      selector = new RoundRobinSinkSelector();
+      selector = new RoundRobinSinkSelector(shouldBackOff);
     } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
-      selector = new RandomOrderSinkSelector();
-    } else if 
(selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN_BACKOFF)) {
-      selector = new BackoffRoundRobinSinkSelector();
-    } else if 
(selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM_BACKOFF)) {
-      selector = new BackoffRandomOrderSinkSelector();
+      selector = new RandomOrderSinkSelector(shouldBackOff);
     } else {
       try {
         @SuppressWarnings("unchecked")
@@ -206,27 +203,38 @@ public class LoadBalancingSinkProcessor extends 
AbstractSinkProcessor {
    * A sink selector that implements the round-robin sink selection policy.
    * This implementation is not MT safe.
    */
+  //Unfortunately both implementations need to override the base implementation
+  //in AbstractSinkSelector class, because any custom sink selectors
+  //will break if this stuff is moved to that class.
   private static class RoundRobinSinkSelector extends AbstractSinkSelector {
 
-    private int nextHead = 0;
+    private OrderSelector<Sink> selector;
+    RoundRobinSinkSelector(boolean backoff){
+      selector = new RoundRobinOrderSelector<Sink>(backoff);
+    }
 
     @Override
-    public Iterator<Sink> createSinkIterator() {
-
-      int size = getSinks().size();
-      int[] indexOrder = new int[size];
-
-      int begin = nextHead++;
-      if (nextHead == size) {
-        nextHead = 0;
+    public void configure(Context context){
+      super.configure(context);
+      if (maxTimeOut != 0) {
+        selector.setMaxTimeOut(maxTimeOut);
       }
+    }
+    @Override
+    public Iterator<Sink> createSinkIterator() {
+      return selector.createIterator();
+    }
 
-      for (int i=0; i < size; i++) {
-        indexOrder[i] = (begin + i)%size;
-      }
+    @Override
+    public void setSinks(List<Sink> sinks) {
+      selector.setObjects(sinks);
+    }
 
-      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+    @Override
+    public void informSinkFailed(Sink failedSink) {
+      selector.informFailure(failedSink);
     }
+
   }
 
   /**
@@ -235,153 +243,33 @@ public class LoadBalancingSinkProcessor extends 
AbstractSinkProcessor {
    */
   private static class RandomOrderSinkSelector extends AbstractSinkSelector {
 
-    private Random random = new Random(System.currentTimeMillis());
+    private OrderSelector<Sink> selector;
 
-    @Override
-    public Iterator<Sink> createSinkIterator() {
-      int size = getSinks().size();
-      int[] indexOrder = new int[size];
-
-      List<Integer> indexList = new ArrayList<Integer>();
-      for (int i=0; i<size; i++) {
-        indexList.add(i);
-      }
-
-      while (indexList.size() != 1) {
-        int pick = random.nextInt(indexList.size());
-        indexOrder[indexList.size() - 1] = indexList.remove(pick);
-      }
-
-      indexOrder[0] = indexList.get(0);
-
-      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+    RandomOrderSinkSelector(boolean backoff){
+      selector = new RandomOrderSelector<Sink>(backoff);
     }
-  }
-
-  private static class FailureState {
-    long lastFail;
-    long restoreTime;
-    int sequentialFails;
-  }
-
-  public static abstract class AbstractBackoffSinkSelector extends 
AbstractSinkSelector {
-    // 2 ^ 16 seconds should be more than enough for an upper limit...
-    private static final int EXP_BACKOFF_COUNTER_LIMIT = 16;
-    private static final String CONF_MAX_TIMEOUT = "maxBackoffMillis";
-    private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l;
-    private static final long MAX_TIMEOUT = 30000l;
-
-    protected List<FailureState> sinkStates;
-    protected Map<Sink, FailureState> stateMap;
-    protected  long maxTimeout = MAX_TIMEOUT;
 
     @Override
     public void configure(Context context) {
       super.configure(context);
-      maxTimeout = context.getLong(CONF_MAX_TIMEOUT, MAX_TIMEOUT);
-    }
-
-    @Override
-    public void setSinks(List<Sink> sinks) {
-      super.setSinks(sinks);
-      sinkStates = new ArrayList<FailureState>();
-      stateMap = new HashMap<Sink, FailureState>();
-      for(Sink sink : sinks) {
-        FailureState state = new FailureState();
-        sinkStates.add(state);
-        stateMap.put(sink, state);
+      if (maxTimeOut != 0) {
+        selector.setMaxTimeOut(maxTimeOut);
       }
     }
 
     @Override
-    public void informSinkFailed(Sink failedSink) {
-      super.informSinkFailed(failedSink);
-      FailureState state = stateMap.get(failedSink);
-      long now = System.currentTimeMillis();
-      long delta = now - state.lastFail;
-
-      long lastBackoffLength = Math.min(MAX_TIMEOUT, 1000 * (1 << 
state.sequentialFails));
-      long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
-      if( allowableDiff > delta ) {
-        if(state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT)
-        state.sequentialFails++;
-      } else {
-        state.sequentialFails = 1;
-      }
-      state.lastFail = now;
-      state.restoreTime = now + Math.min(MAX_TIMEOUT, 1000 * (1 << 
state.sequentialFails));
+    public void setSinks(List<Sink> sinks) {
+      selector.setObjects(sinks);
     }
 
-  }
-
-
-  private static class BackoffRoundRobinSinkSelector extends 
AbstractBackoffSinkSelector {
-    private int nextHead = 0;
-
     @Override
     public Iterator<Sink> createSinkIterator() {
-      long curTime = System.currentTimeMillis();
-      List<Integer> activeIndices = new ArrayList<Integer>();
-      int index = 0;
-      for(FailureState state : sinkStates) {
-        if (state.restoreTime < curTime) {
-          activeIndices.add(index);
-        }
-        index++;
-      }
-
-      int size = activeIndices.size();
-      // possible that the size has shrunk so gotta adjust nextHead for that
-      if(nextHead >= size) {
-        nextHead = 0;
-      }
-      int begin = nextHead++;
-      if (nextHead == activeIndices.size()) {
-        nextHead = 0;
-      }
-
-      int[] indexOrder = new int[size];
-
-      for (int i=0; i < size; i++) {
-        indexOrder[i] = activeIndices.get((begin + i) % size);
-      }
-
-      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+      return selector.createIterator();
     }
-  }
-
-  /**
-   * A sink selector that implements a random sink selection policy. This
-   * implementation is not thread safe.
-   */
-  private static class BackoffRandomOrderSinkSelector extends 
AbstractBackoffSinkSelector {
-    private Random random = new Random(System.currentTimeMillis());
 
     @Override
-    public Iterator<Sink> createSinkIterator() {
-      long now = System.currentTimeMillis();
-
-      List<Integer> indexList = new ArrayList<Integer>();
-
-      int i = 0;
-      for (FailureState state : sinkStates) {
-        if(state.restoreTime < now)
-          indexList.add(i);
-        i++;
-      }
-
-      int size = indexList.size();
-      int[] indexOrder = new int[size];
-
-      while (indexList.size() != 1) {
-        int pick = random.nextInt(indexList.size());
-        indexOrder[indexList.size() - 1] = indexList.remove(pick);
-      }
-
-      indexOrder[0] = indexList.get(0);
-
-      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+    public void informSinkFailed(Sink failedSink) {
+      selector.informFailure(failedSink);
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
index e0705bf..7d95655 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
@@ -40,6 +40,15 @@ import org.junit.Test;
 
 public class TestLoadBalancingSinkProcessor {
 
+  private Context getContext(String selectorType, boolean backoff) {
+    Map<String, String> p = new HashMap<String, String>();
+    p.put("selector", selectorType);
+    p.put("backoff", String.valueOf(backoff));
+    Context ctx = new Context(p);
+
+    return ctx;
+  }
+
   private Context getContext(String selectorType) {
     Map<String, String> p = new HashMap<String, String>();
     p.put("selector", selectorType);
@@ -49,8 +58,8 @@ public class TestLoadBalancingSinkProcessor {
   }
 
   private LoadBalancingSinkProcessor getProcessor(
-      String selectorType, List<Sink> sinks) {
-    return getProcessor(sinks, getContext(selectorType));
+      String selectorType, List<Sink> sinks, boolean backoff) {
+    return getProcessor(sinks, getContext(selectorType, backoff));
   }
 
   private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context 
ctx)
@@ -130,7 +139,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks, false);
 
     Sink.Status s = Sink.Status.READY;
     while (s != Sink.Status.BACKOFF) {
@@ -171,7 +180,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("random_backoff", sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks, true);
 
     // TODO: there is a remote possibility that s0 or s2
     // never get hit by the random assignment
@@ -227,7 +236,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false);
 
     Status s = Status.READY;
     while (s != Status.BACKOFF) {
@@ -290,7 +299,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s9);
     sinks.add(s0);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false);
 
     Status s = Status.READY;
     while (s != Status.BACKOFF) {
@@ -348,7 +357,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks, 
false);
 
     Sink.Status s = Sink.Status.READY;
     while (s != Sink.Status.BACKOFF) {
@@ -386,7 +395,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);
 
     Status s = Status.READY;
     while (s != Status.BACKOFF) {
@@ -423,7 +432,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = 
getProcessor("round_robin_backoff",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true);
 
     Status s = Status.READY;
     for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
@@ -467,7 +476,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = 
getProcessor("round_robin_backoff",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true);
 
     Status s = Status.READY;
     for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
@@ -522,7 +531,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = 
getProcessor("round_robin_backoff",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true);
 
     Status s = Status.READY;
     for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
@@ -564,7 +573,7 @@ public class TestLoadBalancingSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
 
-    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks);
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false);
 
     Status s = Status.READY;
     while (s != Status.BACKOFF) {

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml
index 75acacd..b76387f 100644
--- a/flume-ng-sdk/pom.xml
+++ b/flume-ng-sdk/pom.xml
@@ -109,6 +109,9 @@ limitations under the License.
       <groupId>org.mortbay.jetty</groupId>
       <artifactId>servlet-api</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
index b04e0f0..42297c1 100644
--- 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
+++ 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java
@@ -18,19 +18,18 @@
  */
 package org.apache.flume.api;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
-import org.apache.flume.util.SpecificOrderIterator;
+import org.apache.flume.util.OrderSelector;
+import org.apache.flume.util.RandomOrderSelector;
+import org.apache.flume.util.RoundRobinOrderSelector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,12 +65,14 @@ public class LoadBalancingRpcClient extends 
AbstractRpcClient {
 
     while (it.hasNext()) {
       HostInfo host = it.next();
+      RpcClient client;
       try {
-        RpcClient client = getClient(host);
+        client = getClient(host);
         client.append(event);
         eventSent = true;
         break;
       } catch (Exception ex) {
+        selector.informFailure(host);
         LOGGER.warn("Failed to send event to host " + host, ex);
       }
     }
@@ -94,6 +95,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient 
{
         batchSent = true;
         break;
       } catch (Exception ex) {
+        selector.informFailure(host);
         LOGGER.warn("Failed to send batch to host " + host, ex);
       }
     }
@@ -144,12 +146,24 @@ public class LoadBalancingRpcClient extends 
AbstractRpcClient {
         RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR,
         RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN);
 
+    boolean backoff = Boolean.valueOf(properties.getProperty(
+            RpcClientConfigurationConstants.CONFIG_BACKOFF,
+            String.valueOf(false)));
+
+    String maxBackoffStr = properties.getProperty(
+        RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);
+
+    long maxBackoff = 0;
+    if(maxBackoffStr != null) {
+      maxBackoff = Long.parseLong(maxBackoffStr);
+    }
+
     if (lbTypeName.equalsIgnoreCase(
         RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) {
-      selector = new RoundRobinHostSelector();
+      selector = new RoundRobinHostSelector(backoff, maxBackoff);
     } else if (lbTypeName.equalsIgnoreCase(
         RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) {
-      selector = new RandomOrderHostSelector();
+      selector = new RandomOrderHostSelector(backoff, maxBackoff);
     } else {
       try {
         @SuppressWarnings("unchecked")
@@ -205,6 +219,8 @@ public class LoadBalancingRpcClient extends 
AbstractRpcClient {
     void setHosts(List<HostInfo> hosts);
 
     Iterator<HostInfo> createHostIterator();
+
+    void informFailure(HostInfo failedHost);
   }
 
   /**
@@ -212,68 +228,53 @@ public class LoadBalancingRpcClient extends 
AbstractRpcClient {
    */
   private static class RoundRobinHostSelector implements HostSelector {
 
-    private int nextHead;
-
-    private List<HostInfo> hostList;
+    private OrderSelector<HostInfo> selector;
 
+    RoundRobinHostSelector(boolean backoff, long maxBackoff){
+      selector = new RoundRobinOrderSelector<HostInfo>(backoff);
+      if(maxBackoff != 0){
+        selector.setMaxTimeOut(maxBackoff);
+      }
+    }
     @Override
     public synchronized Iterator<HostInfo> createHostIterator() {
-
-      int size = hostList.size();
-      int[] indexOrder = new int[size];
-
-      int begin = nextHead++;
-      if (nextHead == size) {
-        nextHead = 0;
-      }
-
-      for (int i=0; i < size; i++) {
-        indexOrder[i] = (begin + i)%size;
-      }
-
-      return new SpecificOrderIterator<HostInfo>(indexOrder, hostList);
+      return selector.createIterator();
     }
 
     @Override
     public synchronized void setHosts(List<HostInfo> hosts) {
-      List<HostInfo> infos = new ArrayList<HostInfo>();
-      infos.addAll(hosts);
-      hostList = Collections.unmodifiableList(infos);
+      selector.setObjects(hosts);
+    }
+
+    public synchronized void informFailure(HostInfo failedHost){
+      selector.informFailure(failedHost);
     }
   }
 
   private static class RandomOrderHostSelector implements HostSelector {
 
-    private List<HostInfo> hostList;
+    private OrderSelector<HostInfo> selector;
 
-    private Random random = new Random(System.currentTimeMillis());
+    RandomOrderHostSelector(boolean backoff, Long maxBackoff) {
+      selector = new RandomOrderSelector<HostInfo>(backoff);
+      if (maxBackoff != 0) {
+        selector.setMaxTimeOut(maxBackoff);
+      }
+    }
 
     @Override
     public synchronized Iterator<HostInfo> createHostIterator() {
-      int size = hostList.size();
-      int[] indexOrder = new int[size];
-
-      List<Integer> indexList = new ArrayList<Integer>();
-      for (int i=0; i<size; i++) {
-        indexList.add(i);
-      }
-
-      while (indexList.size() != 1) {
-        int position = indexList.size();
-        int pick = random.nextInt(position);
-        indexOrder[position - 1] = indexList.remove(pick);
-      }
-
-      indexOrder[0] = indexList.get(0);
-
-      return new SpecificOrderIterator<HostInfo>(indexOrder, hostList);
+      return selector.createIterator();
     }
 
     @Override
     public synchronized void setHosts(List<HostInfo> hosts) {
-      List<HostInfo> infos = new ArrayList<HostInfo>();
-      infos.addAll(hosts);
-      hostList = Collections.unmodifiableList(infos);
+      selector.setObjects(hosts);
+    }
+
+    @Override
+    public void informFailure(HostInfo failedHost) {
+      selector.informFailure(failedHost);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
index 72666a6..ab4c3de 100644
--- 
a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
+++ 
b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java
@@ -115,6 +115,10 @@ public final class RpcClientConfigurationConstants {
   public static final String HOST_SELECTOR_ROUND_ROBIN = "ROUND_ROBIN";
   public static final String HOST_SELECTOR_RANDOM = "RANDOM";
 
+  public static final String CONFIG_MAX_BACKOFF = "maxBackoff";
+  public static final String CONFIG_BACKOFF = "backoff";
+  public static final String DEFAULT_BACKOFF = "false";
+
   private RpcClientConfigurationConstants() {
     // disable explicit object creation
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
new file mode 100644
index 0000000..d01916f
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.util;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A basic implementation of an order selector that implements a simple
+ * exponential backoff algorithm. Subclasses can use the same algorithm for
+ * backoff by simply overriding <tt>createIterator</tt> method to order the
+ * list of active sinks returned by <tt>getIndexList</tt> method. Classes
+ * instantiating subclasses of this class are expected to call 
<tt>informFailure</tt>
+ * method when an object passed to this class should be marked as failed and 
backed off.
+ *
+ * When implementing a different backoff algorithm, a subclass should
+ * minimally override <tt>informFailure</tt> and <tt>getIndexList</tt> methods.
+ *
+ * @param <T> - The class on which ordering is to be done
+ */
+public abstract class OrderSelector<T> {
+
+  private static final int EXP_BACKOFF_COUNTER_LIMIT = 16;
+  private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l;
+  private static final long MAX_TIMEOUT = 30000l;
+  private final Map<T, FailureState> stateMap = Maps.newLinkedHashMap();
+  private long maxTimeout = MAX_TIMEOUT;
+  private final boolean shouldBackOff;
+
+  protected OrderSelector(boolean shouldBackOff) {
+    this.shouldBackOff = shouldBackOff;
+  }
+
+  /**
+   * Set the list of objects which this class should return in order.
+   * @param objects
+   */
+  @SuppressWarnings("unchecked")
+  public void setObjects(List<T> objects) {
+    //Order is the same as the original order.
+
+    for (T sink : objects) {
+      FailureState state = new FailureState();
+      stateMap.put(sink, state);
+    }
+  }
+
+  /**
+   * Get the list of objects to be ordered. This list is in the same order
+   * as originally passed in, not in the algorithmically reordered order.
+   * @return - list of objects to be ordered.
+   */
+  public List<T> getObjects() {
+    return Lists.newArrayList(stateMap.keySet());
+  }
+
+  /**
+   *
+   * @return - list of algorithmically ordered active sinks
+   */
+  public abstract Iterator<T> createIterator();
+
+  /**
+   * Inform this class of the failure of an object so it can be backed off.
+   * @param failedObject
+   */
+  public void informFailure(T failedObject) {
+    //If there is no backoff this method is a no-op.
+    if (!shouldBackOff) {
+      return;
+    }
+    FailureState state = stateMap.get(failedObject);
+    long now = System.currentTimeMillis();
+    long delta = now - state.lastFail;
+
+    //Should we consider this as a new failure? If the failure happened
+    //within backoff length + a grace period (failed within
+    //grace period after the component started up again, don't consider this
+    //a new sequential failure - the component might have failed again while
+    //trying to recover. If the failure is outside backedoff time + grace 
period
+    //consider it a new failure and increase the backoff length.
+    long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << 
state.sequentialFails));
+    long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
+    if (allowableDiff > delta) {
+      if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) {
+        state.sequentialFails++;
+      }
+    } else {
+      state.sequentialFails = 1;
+    }
+    state.lastFail = now;
+    //Depending on the number of sequential failures this component had, delay
+    //its restore time. Each time it fails, delay the restore by 1000 ms,
+    //until the maxTimeOut is reached.
+    state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << 
state.sequentialFails));
+  }
+
+  /**
+   *
+   * @return - List of indices currently active objects
+   */
+  protected List<Integer> getIndexList() {
+    long now = System.currentTimeMillis();
+
+    List<Integer> indexList = Lists.newArrayList();
+
+    int i = 0;
+    for (T obj : stateMap.keySet()) {
+      if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) {
+        indexList.add(i);
+      }
+      i++;
+    }
+    return indexList;
+  }
+
+  public boolean isShouldBackOff() {
+    return shouldBackOff;
+  }
+
+  public void setMaxTimeOut(long timeout) {
+    this.maxTimeout = timeout;
+  }
+
+  public long getMaxTimeOut() {
+    return this.maxTimeout;
+  }
+
+  private static class FailureState {
+    long lastFail = 0;
+    long restoreTime = 0;
+    int sequentialFails = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java
new file mode 100644
index 0000000..df6fce9
--- /dev/null
+++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.util;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * An implementation of OrderSelector which returns objects in random order.
+ * Also supports backoff.
+ */
+public class RandomOrderSelector<T> extends OrderSelector<T> {
+
+  private Random random = new Random(System.currentTimeMillis());
+
+  public RandomOrderSelector(boolean shouldBackOff) {
+    super(shouldBackOff);
+  }
+
+  @Override
+  public synchronized Iterator<T> createIterator() {
+    List<Integer> indexList = getIndexList();
+
+    int size = indexList.size();
+    int[] indexOrder = new int[size];
+
+    while (indexList.size() != 1) {
+      int pick = random.nextInt(indexList.size());
+      indexOrder[indexList.size() - 1] = indexList.remove(pick);
+    }
+
+    indexOrder[0] = indexList.get(0);
+
+    return new SpecificOrderIterator<T>(indexOrder, getObjects());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java 
b/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java
new file mode 100644
index 0000000..02c3962
--- /dev/null
+++ 
b/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.util;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An implementation of OrderSelector which returns objects in round robin 
order.
+ * Also supports backoff.
+ */
+
+public class RoundRobinOrderSelector<T> extends OrderSelector<T> {
+
+  private int nextHead = 0;
+
+  public RoundRobinOrderSelector(boolean shouldBackOff) {
+    super(shouldBackOff);
+  }
+
+  @Override
+  public Iterator<T> createIterator() {
+    List<Integer> activeIndices = getIndexList();
+    int size = activeIndices.size();
+    // possible that the size has shrunk so gotta adjust nextHead for that
+    if (nextHead >= size) {
+      nextHead = 0;
+    }
+    int begin = nextHead++;
+    if (nextHead == activeIndices.size()) {
+      nextHead = 0;
+    }
+
+    int[] indexOrder = new int[size];
+
+    for (int i = 0; i < size; i++) {
+      indexOrder[i] = activeIndices.get((begin + i) % size);
+    }
+
+    return new SpecificOrderIterator<T>(indexOrder, getObjects());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
index 10474cb..deb4b1f 100644
--- 
a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
+++ 
b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.api;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -28,10 +30,12 @@ import junit.framework.Assert;
 
 import org.apache.avro.ipc.Server;
 import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.RpcTestUtils.LoadBalancedAvroHandler;
 import org.apache.flume.api.RpcTestUtils.OKAvroHandler;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.avro.Status;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -382,6 +386,201 @@ public class TestLoadBalancingRpcClient {
     }
   }
 
+  @Test
+  public void testRandomBackoff() throws Exception {
+    Properties p = new Properties();
+    List<LoadBalancedAvroHandler> hosts = Lists.newArrayList();
+    List<Server> servers = Lists.newArrayList();
+    StringBuilder hostList = new StringBuilder("");
+    for(int i = 0; i < 3;i++){
+      LoadBalancedAvroHandler s = new LoadBalancedAvroHandler();
+      hosts.add(s);
+      Server srv = RpcTestUtils.startServer(s);
+      servers.add(srv);
+      String name = "h" + i;
+      p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
+      hostList.append(name).append(" ");
+    }
+    p.put("hosts", hostList.toString().trim());
+    p.put("client.type", "default_loadbalance");
+    p.put("host-selector", "random");
+    p.put("backoff", "true");
+    hosts.get(0).setFailed();
+    hosts.get(2).setFailed();
+
+    RpcClient c = RpcClientFactory.getInstance(p);
+    Assert.assertTrue(c instanceof LoadBalancingRpcClient);
+
+    // TODO: there is a remote possibility that s0 or s2
+    // never get hit by the random assignment
+    // and thus not backoffed, causing the test to fail
+    for(int i=0; i < 50; i++) {
+      // a well behaved runner would always check the return.
+      c.append(EventBuilder.withBody(("test" + String.valueOf(i)).getBytes()));
+    }
+    Assert.assertEquals(50, hosts.get(1).getAppendCount());
+    Assert.assertEquals(0, hosts.get(0).getAppendCount());
+    Assert.assertEquals(0, hosts.get(2).getAppendCount());
+    hosts.get(0).setOK();
+    hosts.get(1).setFailed(); // s0 should still be backed off
+    try {
+      c.append(EventBuilder.withBody("shouldfail".getBytes()));
+      // nothing should be able to process right now
+      Assert.fail("Expected EventDeliveryException");
+    } catch (EventDeliveryException e) {
+      // this is expected
+    }
+    Thread.sleep(2500); // wait for s0 to no longer be backed off
+
+    for (int i = 0; i < 50; i++) {
+      // a well behaved runner would always check the return.
+      c.append(EventBuilder.withBody(("test" + String.valueOf(i)).getBytes()));
+    }
+    Assert.assertEquals(50, hosts.get(0).getAppendCount());
+    Assert.assertEquals(50, hosts.get(1).getAppendCount());
+    Assert.assertEquals(0, hosts.get(2).getAppendCount());
+  }
+  @Test
+  public void testRoundRobinBackoffInitialFailure() throws 
EventDeliveryException {
+    Properties p = new Properties();
+    List<LoadBalancedAvroHandler> hosts = Lists.newArrayList();
+    List<Server> servers = Lists.newArrayList();
+    StringBuilder hostList = new StringBuilder("");
+    for (int i = 0; i < 3; i++) {
+      LoadBalancedAvroHandler s = new LoadBalancedAvroHandler();
+      hosts.add(s);
+      Server srv = RpcTestUtils.startServer(s);
+      servers.add(srv);
+      String name = "h" + i;
+      p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
+      hostList.append(name).append(" ");
+    }
+    p.put("hosts", hostList.toString().trim());
+    p.put("client.type", "default_loadbalance");
+    p.put("host-selector", "round_robin");
+    p.put("backoff", "true");
+
+    RpcClient c = RpcClientFactory.getInstance(p);
+    Assert.assertTrue(c instanceof LoadBalancingRpcClient);
+
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+    hosts.get(1).setFailed();
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+    hosts.get(1).setOK();
+    //This time the iterators will never have "1".
+    //So clients get in the order: 1 - 3 - 1
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+
+    Assert.assertEquals(1 + 2 + 1, hosts.get(0).getAppendCount());
+    Assert.assertEquals(1, hosts.get(1).getAppendCount());
+    Assert.assertEquals(1 + 1 + 2, hosts.get(2).getAppendCount());
+  }
+
+  @Test
+  public void testRoundRobinBackoffIncreasingBackoffs() throws Exception {
+    Properties p = new Properties();
+    List<LoadBalancedAvroHandler> hosts = Lists.newArrayList();
+    List<Server> servers = Lists.newArrayList();
+    StringBuilder hostList = new StringBuilder("");
+    for (int i = 0; i < 3; i++) {
+      LoadBalancedAvroHandler s = new LoadBalancedAvroHandler();
+      hosts.add(s);
+      if(i == 1) {
+        s.setFailed();
+      }
+      Server srv = RpcTestUtils.startServer(s);
+      servers.add(srv);
+      String name = "h" + i;
+      p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
+      hostList.append(name).append(" ");
+    }
+    p.put("hosts", hostList.toString().trim());
+    p.put("client.type", "default_loadbalance");
+    p.put("host-selector", "round_robin");
+    p.put("backoff", "true");
+
+    RpcClient c = RpcClientFactory.getInstance(p);
+    Assert.assertTrue(c instanceof LoadBalancingRpcClient);
+
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+    Assert.assertEquals(0, hosts.get(1).getAppendCount());
+    Thread.sleep(2100);
+    // this should let the sink come out of backoff and get backed off  for a 
longer time
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+    Assert.assertEquals(0, hosts.get(1).getAppendCount());
+    hosts.get(1).setOK();
+    Thread.sleep(2100);
+    // this time it shouldn't come out of backoff yet as the timeout isn't over
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+
+    }
+    Assert.assertEquals(0, hosts.get(1).getAppendCount());
+    // after this s2 should be receiving events agains
+    Thread.sleep(2500);
+    int numEvents = 60;
+    for (int i = 0; i < numEvents; i++) {
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+
+    Assert.assertEquals( 2 + 2 + 1 + (numEvents/3), 
hosts.get(0).getAppendCount());
+    Assert.assertEquals((numEvents/3), hosts.get(1).getAppendCount());
+    Assert.assertEquals(1 + 1 + 2 + (numEvents/3), 
hosts.get(2).getAppendCount());
+  }
+
+  @Test
+  public void testRoundRobinBackoffFailureRecovery() throws 
EventDeliveryException, InterruptedException {
+    Properties p = new Properties();
+    List<LoadBalancedAvroHandler> hosts = Lists.newArrayList();
+    List<Server> servers = Lists.newArrayList();
+    StringBuilder hostList = new StringBuilder("");
+    for (int i = 0; i < 3; i++) {
+      LoadBalancedAvroHandler s = new LoadBalancedAvroHandler();
+      hosts.add(s);
+      if (i == 1) {
+        s.setFailed();
+      }
+      Server srv = RpcTestUtils.startServer(s);
+      servers.add(srv);
+      String name = "h" + i;
+      p.put("hosts." + name, "127.0.0.1:" + srv.getPort());
+      hostList.append(name).append(" ");
+    }
+    p.put("hosts", hostList.toString().trim());
+    p.put("client.type", "default_loadbalance");
+    p.put("host-selector", "round_robin");
+    p.put("backoff", "true");
+
+    RpcClient c = RpcClientFactory.getInstance(p);
+    Assert.assertTrue(c instanceof LoadBalancingRpcClient);
+
+
+    for (int i = 0; i < 3; i++) {
+      c.append(EventBuilder.withBody("recovery test".getBytes()));
+    }
+    hosts.get(1).setOK();
+    Thread.sleep(3000);
+    int numEvents = 60;
+
+    for(int i = 0; i < numEvents; i++){
+      c.append(EventBuilder.withBody("testing".getBytes()));
+    }
+
+    Assert.assertEquals(2 + (numEvents/3) , hosts.get(0).getAppendCount());
+    Assert.assertEquals(0 + (numEvents/3), hosts.get(1).getAppendCount());
+    Assert.assertEquals(1 + (numEvents/3), hosts.get(2).getAppendCount());
+  }
+
   private List<Event> getBatchedEvent(int index) {
     List<Event> result = new ArrayList<Event>();
     result.add(EventBuilder.withBody(("event: " + index).getBytes()));

Reply via email to