Updated Branches: refs/heads/flume-1.4 2dcd33c72 -> 9fb173e43
FLUME-1755: Load balancing RPC client has issues with downed hosts (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9fb173e4 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9fb173e4 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9fb173e4 Branch: refs/heads/flume-1.4 Commit: 9fb173e4353bf1323d7d70b421575e1a6343df3a Parents: 2dcd33c Author: Brock Noland <[email protected]> Authored: Fri Dec 7 09:24:57 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Fri Dec 7 09:25:11 2012 -0600 ---------------------------------------------------------------------- .../apache/flume/api/LoadBalancingRpcClient.java | 11 ++-- .../flume/api/TestLoadBalancingRpcClient.java | 48 ++++++++++++++- 2 files changed, 53 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/9fb173e4/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java index 42297c1..f396104 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/LoadBalancingRpcClient.java @@ -65,9 +65,8 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { while (it.hasNext()) { HostInfo host = it.next(); - RpcClient client; try { - client = getClient(host); + RpcClient client = getClient(host); client.append(event); eventSent = true; break; @@ -89,8 +88,8 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { while (it.hasNext()) { HostInfo host = it.next(); - RpcClient client = getClient(host); try { + RpcClient client = getClient(host); client.appendBatch(events); batchSent = true; break; @@ -180,7 +179,9 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { selector.setHosts(hosts); } - private synchronized RpcClient getClient(HostInfo info) { + private synchronized RpcClient getClient(HostInfo info) + throws FlumeException { + String name = info.getReferenceName(); RpcClient client = clientMap.get(name); if (client == null) { @@ -199,7 +200,7 @@ public class LoadBalancingRpcClient extends AbstractRpcClient { return client; } - private RpcClient createClient(String referenceName) { + private RpcClient createClient(String referenceName) throws FlumeException { Properties props = getClientConfigurationProperties(referenceName); return RpcClientFactory.getInstance(props); } http://git-wip-us.apache.org/repos/asf/flume/blob/9fb173e4/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java index 49a69bf..9071734 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java @@ -97,6 +97,52 @@ public class TestLoadBalancingRpcClient { } } + /** + * Ensure that we can tolerate a host that is completely down. + * @throws Exception + */ + @Test + public void testTwoHostsOneDead() throws Exception { + LOGGER.info("Running testTwoHostsOneDead..."); + Server s1 = null; + RpcClient c1 = null, c2 = null; + try { + LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); + s1 = RpcTestUtils.startServer(h1); + // do not create a 2nd server (assume it's "down") + + Properties p = new Properties(); + p.put("hosts", "h1 h2"); + p.put("client.type", "default_loadbalance"); + p.put("hosts.h1", "127.0.0.1:" + 0); // port 0 should always be closed + p.put("hosts.h2", "127.0.0.1:" + s1.getPort()); + + // test batch API + c1 = RpcClientFactory.getInstance(p); + Assert.assertTrue(c1 instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 10; i++) { + c1.appendBatch(getBatchedEvent(i)); + } + Assert.assertEquals(10, h1.getAppendBatchCount()); + + // test non-batch API + c2 = RpcClientFactory.getInstance(p); + Assert.assertTrue(c2 instanceof LoadBalancingRpcClient); + + for (int i = 0; i < 10; i++) { + c2.append(getEvent(i)); + } + Assert.assertEquals(10, h1.getAppendCount()); + + + } finally { + if (s1 != null) s1.close(); + if (c1 != null) c1.close(); + if (c2 != null) c2.close(); + } + } + @Test public void testTwoHostFailoverBatch() throws Exception { Server s1 = null, s2 = null; @@ -584,7 +630,7 @@ public class TestLoadBalancingRpcClient { private List<Event> getBatchedEvent(int index) { List<Event> result = new ArrayList<Event>(); - result.add(EventBuilder.withBody(("event: " + index).getBytes())); + result.add(getEvent(index)); return result; }
