Updated Branches: refs/heads/trunk 23bfd9ed5 -> 960d03d8a
FLUME-1488. Load Balancing RPC client should support exponential backoff of failed nodes. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/960d03d8 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/960d03d8 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/960d03d8 Branch: refs/heads/trunk Commit: 960d03d8a5d3b56db15dd390435a3b359f3ac4fb Parents: 23bfd9e Author: Mike Percy <[email protected]> Authored: Wed Sep 12 16:24:40 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Sep 12 16:24:40 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/AbstractSinkSelector.java | 8 +- .../flume/sink/LoadBalancingSinkProcessor.java | 196 +++----------- .../flume/sink/TestLoadBalancingSinkProcessor.java | 33 ++- flume-ng-sdk/pom.xml | 5 +- .../apache/flume/api/LoadBalancingRpcClient.java | 99 ++++---- .../flume/api/RpcClientConfigurationConstants.java | 4 + .../java/org/apache/flume/util/OrderSelector.java | 152 +++++++++++ .../org/apache/flume/util/RandomOrderSelector.java | 52 ++++ .../apache/flume/util/RoundRobinOrderSelector.java | 57 ++++ .../flume/api/TestLoadBalancingRpcClient.java | 199 +++++++++++++++ 10 files changed, 587 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java index 3e806a7..9ddeef4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java @@ -33,10 +33,14 @@ public abstract class AbstractSinkSelector implements SinkSelector { // List of sinks as specified private List<Sink> sinkList; + protected long maxTimeOut = 0; + @Override public void configure(Context context) { - // no-op configure method for convenience for implementations - // that do not require configuration. + Long timeOut = context.getLong("maxTimeOut"); + if(timeOut != null){ + maxTimeOut = timeOut; + } } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java index 93a46a0..2d85756 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java @@ -18,12 +18,8 @@ */ package org.apache.flume.sink; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Random; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; @@ -32,11 +28,13 @@ import org.apache.flume.Sink; import org.apache.flume.Sink.Status; import org.apache.flume.conf.Configurable; import org.apache.flume.lifecycle.LifecycleAware; -import org.apache.flume.util.SpecificOrderIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import org.apache.flume.util.OrderSelector; +import org.apache.flume.util.RandomOrderSelector; +import org.apache.flume.util.RoundRobinOrderSelector; /** * <p>Provides the ability to load-balance flow over multiple sinks.</p> @@ -82,6 +80,7 @@ import com.google.common.base.Preconditions; public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { public static final String CONFIG_SELECTOR = "selector"; public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + "."; + public static final String CONFIG_BACKOFF = "backoff"; public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN"; public static final String SELECTOR_NAME_RANDOM = "RANDOM"; @@ -102,16 +101,14 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { String selectorTypeName = context.getString(CONFIG_SELECTOR, SELECTOR_NAME_ROUND_ROBIN); + Boolean shouldBackOff = context.getBoolean(CONFIG_BACKOFF, false); + selector = null; if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) { - selector = new RoundRobinSinkSelector(); + selector = new RoundRobinSinkSelector(shouldBackOff); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { - selector = new RandomOrderSinkSelector(); - } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN_BACKOFF)) { - selector = new BackoffRoundRobinSinkSelector(); - } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM_BACKOFF)) { - selector = new BackoffRandomOrderSinkSelector(); + selector = new RandomOrderSinkSelector(shouldBackOff); } else { try { @SuppressWarnings("unchecked") @@ -206,27 +203,38 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { * A sink selector that implements the round-robin sink selection policy. * This implementation is not MT safe. */ + //Unfortunately both implementations need to override the base implementation + //in AbstractSinkSelector class, because any custom sink selectors + //will break if this stuff is moved to that class. private static class RoundRobinSinkSelector extends AbstractSinkSelector { - private int nextHead = 0; + private OrderSelector<Sink> selector; + RoundRobinSinkSelector(boolean backoff){ + selector = new RoundRobinOrderSelector<Sink>(backoff); + } @Override - public Iterator<Sink> createSinkIterator() { - - int size = getSinks().size(); - int[] indexOrder = new int[size]; - - int begin = nextHead++; - if (nextHead == size) { - nextHead = 0; + public void configure(Context context){ + super.configure(context); + if (maxTimeOut != 0) { + selector.setMaxTimeOut(maxTimeOut); } + } + @Override + public Iterator<Sink> createSinkIterator() { + return selector.createIterator(); + } - for (int i=0; i < size; i++) { - indexOrder[i] = (begin + i)%size; - } + @Override + public void setSinks(List<Sink> sinks) { + selector.setObjects(sinks); + } - return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + @Override + public void informSinkFailed(Sink failedSink) { + selector.informFailure(failedSink); } + } /** @@ -235,153 +243,33 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor { */ private static class RandomOrderSinkSelector extends AbstractSinkSelector { - private Random random = new Random(System.currentTimeMillis()); + private OrderSelector<Sink> selector; - @Override - public Iterator<Sink> createSinkIterator() { - int size = getSinks().size(); - int[] indexOrder = new int[size]; - - List<Integer> indexList = new ArrayList<Integer>(); - for (int i=0; i<size; i++) { - indexList.add(i); - } - - while (indexList.size() != 1) { - int pick = random.nextInt(indexList.size()); - indexOrder[indexList.size() - 1] = indexList.remove(pick); - } - - indexOrder[0] = indexList.get(0); - - return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + RandomOrderSinkSelector(boolean backoff){ + selector = new RandomOrderSelector<Sink>(backoff); } - } - - private static class FailureState { - long lastFail; - long restoreTime; - int sequentialFails; - } - - public static abstract class AbstractBackoffSinkSelector extends AbstractSinkSelector { - // 2 ^ 16 seconds should be more than enough for an upper limit... - private static final int EXP_BACKOFF_COUNTER_LIMIT = 16; - private static final String CONF_MAX_TIMEOUT = "maxBackoffMillis"; - private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l; - private static final long MAX_TIMEOUT = 30000l; - - protected List<FailureState> sinkStates; - protected Map<Sink, FailureState> stateMap; - protected long maxTimeout = MAX_TIMEOUT; @Override public void configure(Context context) { super.configure(context); - maxTimeout = context.getLong(CONF_MAX_TIMEOUT, MAX_TIMEOUT); - } - - @Override - public void setSinks(List<Sink> sinks) { - super.setSinks(sinks); - sinkStates = new ArrayList<FailureState>(); - stateMap = new HashMap<Sink, FailureState>(); - for(Sink sink : sinks) { - FailureState state = new FailureState(); - sinkStates.add(state); - stateMap.put(sink, state); + if (maxTimeOut != 0) { + selector.setMaxTimeOut(maxTimeOut); } } @Override - public void informSinkFailed(Sink failedSink) { - super.informSinkFailed(failedSink); - FailureState state = stateMap.get(failedSink); - long now = System.currentTimeMillis(); - long delta = now - state.lastFail; - - long lastBackoffLength = Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails)); - long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE; - if( allowableDiff > delta ) { - if(state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) - state.sequentialFails++; - } else { - state.sequentialFails = 1; - } - state.lastFail = now; - state.restoreTime = now + Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails)); + public void setSinks(List<Sink> sinks) { + selector.setObjects(sinks); } - } - - - private static class BackoffRoundRobinSinkSelector extends AbstractBackoffSinkSelector { - private int nextHead = 0; - @Override public Iterator<Sink> createSinkIterator() { - long curTime = System.currentTimeMillis(); - List<Integer> activeIndices = new ArrayList<Integer>(); - int index = 0; - for(FailureState state : sinkStates) { - if (state.restoreTime < curTime) { - activeIndices.add(index); - } - index++; - } - - int size = activeIndices.size(); - // possible that the size has shrunk so gotta adjust nextHead for that - if(nextHead >= size) { - nextHead = 0; - } - int begin = nextHead++; - if (nextHead == activeIndices.size()) { - nextHead = 0; - } - - int[] indexOrder = new int[size]; - - for (int i=0; i < size; i++) { - indexOrder[i] = activeIndices.get((begin + i) % size); - } - - return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + return selector.createIterator(); } - } - - /** - * A sink selector that implements a random sink selection policy. This - * implementation is not thread safe. - */ - private static class BackoffRandomOrderSinkSelector extends AbstractBackoffSinkSelector { - private Random random = new Random(System.currentTimeMillis()); @Override - public Iterator<Sink> createSinkIterator() { - long now = System.currentTimeMillis(); - - List<Integer> indexList = new ArrayList<Integer>(); - - int i = 0; - for (FailureState state : sinkStates) { - if(state.restoreTime < now) - indexList.add(i); - i++; - } - - int size = indexList.size(); - int[] indexOrder = new int[size]; - - while (indexList.size() != 1) { - int pick = random.nextInt(indexList.size()); - indexOrder[indexList.size() - 1] = indexList.remove(pick); - } - - indexOrder[0] = indexList.get(0); - - return new SpecificOrderIterator<Sink>(indexOrder, getSinks()); + public void informSinkFailed(Sink failedSink) { + selector.informFailure(failedSink); } } - } http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java index e0705bf..7d95655 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java @@ -40,6 +40,15 @@ import org.junit.Test; public class TestLoadBalancingSinkProcessor { + private Context getContext(String selectorType, boolean backoff) { + Map<String, String> p = new HashMap<String, String>(); + p.put("selector", selectorType); + p.put("backoff", String.valueOf(backoff)); + Context ctx = new Context(p); + + return ctx; + } + private Context getContext(String selectorType) { Map<String, String> p = new HashMap<String, String>(); p.put("selector", selectorType); @@ -49,8 +58,8 @@ public class TestLoadBalancingSinkProcessor { } private LoadBalancingSinkProcessor getProcessor( - String selectorType, List<Sink> sinks) { - return getProcessor(sinks, getContext(selectorType)); + String selectorType, List<Sink> sinks, boolean backoff) { + return getProcessor(sinks, getContext(selectorType, backoff)); } private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context ctx) @@ -130,7 +139,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks, false); Sink.Status s = Sink.Status.READY; while (s != Sink.Status.BACKOFF) { @@ -171,7 +180,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("random_backoff", sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("random", sinks, true); // TODO: there is a remote possibility that s0 or s2 // never get hit by the random assignment @@ -227,7 +236,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false); Status s = Status.READY; while (s != Status.BACKOFF) { @@ -290,7 +299,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s9); sinks.add(s0); - LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("random",sinks, false); Status s = Status.READY; while (s != Status.BACKOFF) { @@ -348,7 +357,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin", sinks, false); Sink.Status s = Sink.Status.READY; while (s != Sink.Status.BACKOFF) { @@ -386,7 +395,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false); Status s = Status.READY; while (s != Status.BACKOFF) { @@ -423,7 +432,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true); Status s = Status.READY; for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { @@ -467,7 +476,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true); Status s = Status.READY; for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { @@ -522,7 +531,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, true); Status s = Status.READY; for (int i = 0; i < 3 && s != Status.BACKOFF; i++) { @@ -564,7 +573,7 @@ public class TestLoadBalancingSinkProcessor { sinks.add(s2); sinks.add(s3); - LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks); + LoadBalancingSinkProcessor lbsp = getProcessor("round_robin",sinks, false); Status s = Status.READY; while (s != Status.BACKOFF) { http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/pom.xml b/flume-ng-sdk/pom.xml index 75acacd..b76387f 100644 --- a/flume-ng-sdk/pom.xml +++ b/flume-ng-sdk/pom.xml @@ -109,6 +109,9 @@ limitations under the License. <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api</artifactId> </dependency> - + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index b04e0f0..42297c1 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -18,19 +18,18 @@ */ package org.apache.flume.api; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; -import org.apache.flume.util.SpecificOrderIterator; +import org.apache.flume.util.OrderSelector; +import org.apache.flume.util.RandomOrderSelector; +import org.apache.flume.util.RoundRobinOrderSelector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,12 +65,14 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { while (it.hasNext()) { HostInfo host = it.next(); + RpcClient client; try { - RpcClient client = getClient(host); + client = getClient(host); client.append(event); eventSent = true; break; } catch (Exception ex) { + selector.informFailure(host); LOGGER.warn("Failed to send event to host " + host, ex); } } @@ -94,6 +95,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { batchSent = true; break; } catch (Exception ex) { + selector.informFailure(host); LOGGER.warn("Failed to send batch to host " + host, ex); } } @@ -144,12 +146,24 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR, RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN); + boolean backoff = Boolean.valueOf(properties.getProperty( + RpcClientConfigurationConstants.CONFIG_BACKOFF, + String.valueOf(false))); + + String maxBackoffStr = properties.getProperty( + RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF); + + long maxBackoff = 0; + if(maxBackoffStr != null) { + maxBackoff = Long.parseLong(maxBackoffStr); + } + if (lbTypeName.equalsIgnoreCase( RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) { - selector = new RoundRobinHostSelector(); + selector = new RoundRobinHostSelector(backoff, maxBackoff); } else if (lbTypeName.equalsIgnoreCase( RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) { - selector = new RandomOrderHostSelector(); + selector = new RandomOrderHostSelector(backoff, maxBackoff); } else { try { @SuppressWarnings("unchecked") @@ -205,6 +219,8 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { void setHosts(List<HostInfo> hosts); Iterator<HostInfo> createHostIterator(); + + void informFailure(HostInfo failedHost); } /** @@ -212,68 +228,53 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { */ private static class RoundRobinHostSelector implements HostSelector { - private int nextHead; - - private List<HostInfo> hostList; + private OrderSelector<HostInfo> selector; + RoundRobinHostSelector(boolean backoff, long maxBackoff){ + selector = new RoundRobinOrderSelector<HostInfo>(backoff); + if(maxBackoff != 0){ + selector.setMaxTimeOut(maxBackoff); + } + } @Override public synchronized Iterator<HostInfo> createHostIterator() { - - int size = hostList.size(); - int[] indexOrder = new int[size]; - - int begin = nextHead++; - if (nextHead == size) { - nextHead = 0; - } - - for (int i=0; i < size; i++) { - indexOrder[i] = (begin + i)%size; - } - - return new SpecificOrderIterator<HostInfo>(indexOrder, hostList); + return selector.createIterator(); } @Override public synchronized void setHosts(List<HostInfo> hosts) { - List<HostInfo> infos = new ArrayList<HostInfo>(); - infos.addAll(hosts); - hostList = Collections.unmodifiableList(infos); + selector.setObjects(hosts); + } + + public synchronized void informFailure(HostInfo failedHost){ + selector.informFailure(failedHost); } } private static class RandomOrderHostSelector implements HostSelector { - private List<HostInfo> hostList; + private OrderSelector<HostInfo> selector; - private Random random = new Random(System.currentTimeMillis()); + RandomOrderHostSelector(boolean backoff, Long maxBackoff) { + selector = new RandomOrderSelector<HostInfo>(backoff); + if (maxBackoff != 0) { + selector.setMaxTimeOut(maxBackoff); + } + } @Override public synchronized Iterator<HostInfo> createHostIterator() { - int size = hostList.size(); - int[] indexOrder = new int[size]; - - List<Integer> indexList = new ArrayList<Integer>(); - for (int i=0; i<size; i++) { - indexList.add(i); - } - - while (indexList.size() != 1) { - int position = indexList.size(); - int pick = random.nextInt(position); - indexOrder[position - 1] = indexList.remove(pick); - } - - indexOrder[0] = indexList.get(0); - - return new SpecificOrderIterator<HostInfo>(indexOrder, hostList); + return selector.createIterator(); } @Override public synchronized void setHosts(List<HostInfo> hosts) { - List<HostInfo> infos = new ArrayList<HostInfo>(); - infos.addAll(hosts); - hostList = Collections.unmodifiableList(infos); + selector.setObjects(hosts); + } + + @Override + public void informFailure(HostInfo failedHost) { + selector.informFailure(failedHost); } } http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java index 72666a6..ab4c3de 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientConfigurationConstants.java @@ -115,6 +115,10 @@ public final class RpcClientConfigurationConstants { public static final String HOST_SELECTOR_ROUND_ROBIN = "ROUND_ROBIN"; public static final String HOST_SELECTOR_RANDOM = "RANDOM"; + public static final String CONFIG_MAX_BACKOFF = "maxBackoff"; + public static final String CONFIG_BACKOFF = "backoff"; + public static final String DEFAULT_BACKOFF = "false"; + private RpcClientConfigurationConstants() { // disable explicit object creation } http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java new file mode 100644 index 0000000..d01916f --- /dev/null +++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/OrderSelector.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.util; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A basic implementation of an order selector that implements a simple + * exponential backoff algorithm. Subclasses can use the same algorithm for + * backoff by simply overriding <tt>createIterator</tt> method to order the + * list of active sinks returned by <tt>getIndexList</tt> method. Classes + * instantiating subclasses of this class are expected to call <tt>informFailure</tt> + * method when an object passed to this class should be marked as failed and backed off. + * + * When implementing a different backoff algorithm, a subclass should + * minimally override <tt>informFailure</tt> and <tt>getIndexList</tt> methods. + * + * @param <T> - The class on which ordering is to be done + */ +public abstract class OrderSelector<T> { + + private static final int EXP_BACKOFF_COUNTER_LIMIT = 16; + private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l; + private static final long MAX_TIMEOUT = 30000l; + private final Map<T, FailureState> stateMap = Maps.newLinkedHashMap(); + private long maxTimeout = MAX_TIMEOUT; + private final boolean shouldBackOff; + + protected OrderSelector(boolean shouldBackOff) { + this.shouldBackOff = shouldBackOff; + } + + /** + * Set the list of objects which this class should return in order. + * @param objects + */ + @SuppressWarnings("unchecked") + public void setObjects(List<T> objects) { + //Order is the same as the original order. + + for (T sink : objects) { + FailureState state = new FailureState(); + stateMap.put(sink, state); + } + } + + /** + * Get the list of objects to be ordered. This list is in the same order + * as originally passed in, not in the algorithmically reordered order. + * @return - list of objects to be ordered. + */ + public List<T> getObjects() { + return Lists.newArrayList(stateMap.keySet()); + } + + /** + * + * @return - list of algorithmically ordered active sinks + */ + public abstract Iterator<T> createIterator(); + + /** + * Inform this class of the failure of an object so it can be backed off. + * @param failedObject + */ + public void informFailure(T failedObject) { + //If there is no backoff this method is a no-op. + if (!shouldBackOff) { + return; + } + FailureState state = stateMap.get(failedObject); + long now = System.currentTimeMillis(); + long delta = now - state.lastFail; + + //Should we consider this as a new failure? If the failure happened + //within backoff length + a grace period (failed within + //grace period after the component started up again, don't consider this + //a new sequential failure - the component might have failed again while + //trying to recover. If the failure is outside backedoff time + grace period + //consider it a new failure and increase the backoff length. + long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails)); + long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE; + if (allowableDiff > delta) { + if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) { + state.sequentialFails++; + } + } else { + state.sequentialFails = 1; + } + state.lastFail = now; + //Depending on the number of sequential failures this component had, delay + //its restore time. Each time it fails, delay the restore by 1000 ms, + //until the maxTimeOut is reached. + state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails)); + } + + /** + * + * @return - List of indices currently active objects + */ + protected List<Integer> getIndexList() { + long now = System.currentTimeMillis(); + + List<Integer> indexList = Lists.newArrayList(); + + int i = 0; + for (T obj : stateMap.keySet()) { + if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) { + indexList.add(i); + } + i++; + } + return indexList; + } + + public boolean isShouldBackOff() { + return shouldBackOff; + } + + public void setMaxTimeOut(long timeout) { + this.maxTimeout = timeout; + } + + public long getMaxTimeOut() { + return this.maxTimeout; + } + + private static class FailureState { + long lastFail = 0; + long restoreTime = 0; + int sequentialFails = 0; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java new file mode 100644 index 0000000..df6fce9 --- /dev/null +++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/RandomOrderSelector.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.util; + +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +/** + * An implementation of OrderSelector which returns objects in random order. + * Also supports backoff. + */ +public class RandomOrderSelector<T> extends OrderSelector<T> { + + private Random random = new Random(System.currentTimeMillis()); + + public RandomOrderSelector(boolean shouldBackOff) { + super(shouldBackOff); + } + + @Override + public synchronized Iterator<T> createIterator() { + List<Integer> indexList = getIndexList(); + + int size = indexList.size(); + int[] indexOrder = new int[size]; + + while (indexList.size() != 1) { + int pick = random.nextInt(indexList.size()); + indexOrder[indexList.size() - 1] = indexList.remove(pick); + } + + indexOrder[0] = indexList.get(0); + + return new SpecificOrderIterator<T>(indexOrder, getObjects()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java b/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java new file mode 100644 index 0000000..02c3962 --- /dev/null +++ b/flume-ng-sdk/src/main/java/org/apache/flume/util/RoundRobinOrderSelector.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.util; + +import java.util.Iterator; +import java.util.List; + +/** + * An implementation of OrderSelector which returns objects in round robin order. + * Also supports backoff. + */ + +public class RoundRobinOrderSelector<T> extends OrderSelector<T> { + + private int nextHead = 0; + + public RoundRobinOrderSelector(boolean shouldBackOff) { + super(shouldBackOff); + } + + @Override + public Iterator<T> createIterator() { + List<Integer> activeIndices = getIndexList(); + int size = activeIndices.size(); + // possible that the size has shrunk so gotta adjust nextHead for that + if (nextHead >= size) { + nextHead = 0; + } + int begin = nextHead++; + if (nextHead == activeIndices.size()) { + nextHead = 0; + } + + int[] indexOrder = new int[size]; + + for (int i = 0; i < size; i++) { + indexOrder[i] = activeIndices.get((begin + i) % size); + } + + return new SpecificOrderIterator<T>(indexOrder, getObjects()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/960d03d8/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java index 10474cb..deb4b1f 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java @@ -18,6 +18,8 @@ */ package org.apache.flume.api; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -28,10 +30,12 @@ import junit.framework.Assert; import org.apache.avro.ipc.Server; import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; import org.apache.flume.FlumeException; import org.apache.flume.api.RpcTestUtils.LoadBalancedAvroHandler; import org.apache.flume.api.RpcTestUtils.OKAvroHandler; import org.apache.flume.event.EventBuilder; +import org.apache.flume.source.avro.Status; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -382,6 +386,201 @@ public class TestLoadBalancingRpcClient { } } + @Test + public void testRandomBackoff() throws Exception { + Properties p = new Properties(); + List<LoadBalancedAvroHandler> hosts = Lists.newArrayList(); + List<Server> servers = Lists.newArrayList(); + StringBuilder hostList = new StringBuilder(""); + for(int i = 0; i < 3;i++){ + LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); + hosts.add(s); + Server srv = RpcTestUtils.startServer(s); + servers.add(srv); + String name = "h" + i; + p.put("hosts." + name, "127.0.0.1:" + srv.getPort()); + hostList.append(name).append(" "); + } + p.put("hosts", hostList.toString().trim()); + p.put("client.type", "default_loadbalance"); + p.put("host-selector", "random"); + p.put("backoff", "true"); + hosts.get(0).setFailed(); + hosts.get(2).setFailed(); + + RpcClient c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + // TODO: there is a remote possibility that s0 or s2 + // never get hit by the random assignment + // and thus not backoffed, causing the test to fail + for(int i=0; i < 50; i++) { + // a well behaved runner would always check the return. + c.append(EventBuilder.withBody(("test" + String.valueOf(i)).getBytes())); + } + Assert.assertEquals(50, hosts.get(1).getAppendCount()); + Assert.assertEquals(0, hosts.get(0).getAppendCount()); + Assert.assertEquals(0, hosts.get(2).getAppendCount()); + hosts.get(0).setOK(); + hosts.get(1).setFailed(); // s0 should still be backed off + try { + c.append(EventBuilder.withBody("shouldfail".getBytes())); + // nothing should be able to process right now + Assert.fail("Expected EventDeliveryException"); + } catch (EventDeliveryException e) { + // this is expected + } + Thread.sleep(2500); // wait for s0 to no longer be backed off + + for (int i = 0; i < 50; i++) { + // a well behaved runner would always check the return. + c.append(EventBuilder.withBody(("test" + String.valueOf(i)).getBytes())); + } + Assert.assertEquals(50, hosts.get(0).getAppendCount()); + Assert.assertEquals(50, hosts.get(1).getAppendCount()); + Assert.assertEquals(0, hosts.get(2).getAppendCount()); + } + @Test + public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException { + Properties p = new Properties(); + List<LoadBalancedAvroHandler> hosts = Lists.newArrayList(); + List<Server> servers = Lists.newArrayList(); + StringBuilder hostList = new StringBuilder(""); + for (int i = 0; i < 3; i++) { + LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); + hosts.add(s); + Server srv = RpcTestUtils.startServer(s); + servers.add(srv); + String name = "h" + i; + p.put("hosts." + name, "127.0.0.1:" + srv.getPort()); + hostList.append(name).append(" "); + } + p.put("hosts", hostList.toString().trim()); + p.put("client.type", "default_loadbalance"); + p.put("host-selector", "round_robin"); + p.put("backoff", "true"); + + RpcClient c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + hosts.get(1).setFailed(); + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + hosts.get(1).setOK(); + //This time the iterators will never have "1". + //So clients get in the order: 1 - 3 - 1 + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + + Assert.assertEquals(1 + 2 + 1, hosts.get(0).getAppendCount()); + Assert.assertEquals(1, hosts.get(1).getAppendCount()); + Assert.assertEquals(1 + 1 + 2, hosts.get(2).getAppendCount()); + } + + @Test + public void testRoundRobinBackoffIncreasingBackoffs() throws Exception { + Properties p = new Properties(); + List<LoadBalancedAvroHandler> hosts = Lists.newArrayList(); + List<Server> servers = Lists.newArrayList(); + StringBuilder hostList = new StringBuilder(""); + for (int i = 0; i < 3; i++) { + LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); + hosts.add(s); + if(i == 1) { + s.setFailed(); + } + Server srv = RpcTestUtils.startServer(s); + servers.add(srv); + String name = "h" + i; + p.put("hosts." + name, "127.0.0.1:" + srv.getPort()); + hostList.append(name).append(" "); + } + p.put("hosts", hostList.toString().trim()); + p.put("client.type", "default_loadbalance"); + p.put("host-selector", "round_robin"); + p.put("backoff", "true"); + + RpcClient c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + Assert.assertEquals(0, hosts.get(1).getAppendCount()); + Thread.sleep(2100); + // this should let the sink come out of backoff and get backed off for a longer time + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + Assert.assertEquals(0, hosts.get(1).getAppendCount()); + hosts.get(1).setOK(); + Thread.sleep(2100); + // this time it shouldn't come out of backoff yet as the timeout isn't over + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + + } + Assert.assertEquals(0, hosts.get(1).getAppendCount()); + // after this s2 should be receiving events agains + Thread.sleep(2500); + int numEvents = 60; + for (int i = 0; i < numEvents; i++) { + c.append(EventBuilder.withBody("testing".getBytes())); + } + + Assert.assertEquals( 2 + 2 + 1 + (numEvents/3), hosts.get(0).getAppendCount()); + Assert.assertEquals((numEvents/3), hosts.get(1).getAppendCount()); + Assert.assertEquals(1 + 1 + 2 + (numEvents/3), hosts.get(2).getAppendCount()); + } + + @Test + public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException { + Properties p = new Properties(); + List<LoadBalancedAvroHandler> hosts = Lists.newArrayList(); + List<Server> servers = Lists.newArrayList(); + StringBuilder hostList = new StringBuilder(""); + for (int i = 0; i < 3; i++) { + LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); + hosts.add(s); + if (i == 1) { + s.setFailed(); + } + Server srv = RpcTestUtils.startServer(s); + servers.add(srv); + String name = "h" + i; + p.put("hosts." + name, "127.0.0.1:" + srv.getPort()); + hostList.append(name).append(" "); + } + p.put("hosts", hostList.toString().trim()); + p.put("client.type", "default_loadbalance"); + p.put("host-selector", "round_robin"); + p.put("backoff", "true"); + + RpcClient c = RpcClientFactory.getInstance(p); + Assert.assertTrue(c instanceof LoadBalancingRpcClient); + + + for (int i = 0; i < 3; i++) { + c.append(EventBuilder.withBody("recovery test".getBytes())); + } + hosts.get(1).setOK(); + Thread.sleep(3000); + int numEvents = 60; + + for(int i = 0; i < numEvents; i++){ + c.append(EventBuilder.withBody("testing".getBytes())); + } + + Assert.assertEquals(2 + (numEvents/3) , hosts.get(0).getAppendCount()); + Assert.assertEquals(0 + (numEvents/3), hosts.get(1).getAppendCount()); + Assert.assertEquals(1 + (numEvents/3), hosts.get(2).getAppendCount()); + } + private List<Event> getBatchedEvent(int index) { List<Event> result = new ArrayList<Event>(); result.add(EventBuilder.withBody(("event: " + index).getBytes()));
