Updated Branches: refs/heads/trunk efd6d26c1 -> 88980d06b
FLUME-1756. Avro client should be able to use load balancing RPC (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/88980d06 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/88980d06 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/88980d06 Branch: refs/heads/trunk Commit: 88980d06b0b5b8af632055af104a4a6e03b32a62 Parents: efd6d26 Author: Hari Shreedharan <[email protected]> Authored: Wed Dec 19 17:49:02 2012 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Wed Dec 19 17:49:02 2012 -0800 ---------------------------------------------------------------------- bin/flume-ng | 15 +++-- .../apache/flume/client/avro/AvroCLIClient.java | 50 +++++++++++---- 2 files changed, 47 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/88980d06/bin/flume-ng ---------------------------------------------------------------------- diff --git a/bin/flume-ng b/bin/flume-ng index 4fd1503..ee86c95 100755 --- a/bin/flume-ng +++ b/bin/flume-ng @@ -190,12 +190,15 @@ agent options: --help,-h display help text avro-client options: - --dirname <dir> directory to stream to avro source - --host,-H <host> hostname to which events will be sent (required) - --port,-p <port> port of the avro source (required) - --filename,-F <file> text file to stream to avro source [default: std input] - --headerFile,-R <file> headerFile containing headers as key/value pairs on each new line - --help,-h display help text + --rpcProps,-P <file> RPC client properties file with server connection params + --host,-H <host> hostname to which events will be sent + --port,-p <port> port of the avro source + --dirname <dir> directory to stream to avro source + --filename,-F <file> text file to stream to avro source (default: std input) + --headerFile,-R <file> File containing event headers as key/value pairs on each new line + --help,-h display help text + + Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath. http://git-wip-us.apache.org/repos/asf/flume/blob/88980d06/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java index f00ce24..da23a75 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import com.google.common.base.Preconditions; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -59,6 +60,7 @@ public class AvroCLIClient { private String hostname; private int port; private String fileName; + private String rpcClientPropsFile; private String dirName; private Map<String, String> headers = new HashMap<String, String>(); private int sent; @@ -73,7 +75,7 @@ public class AvroCLIClient { } catch (ParseException e) { logger.error("Unable to parse command line options - {}", e.getMessage()); } catch (IOException e) { - logger.error("Unable to send data to Flume. Exception follows.", e); + logger.error("Unable to send data to Flume. Exception follows.", e); } catch (FlumeException e) { logger.error("Unable to open connection to Flume. Exception follows.", e); } catch (EventDeliveryException e) { @@ -120,7 +122,10 @@ public class AvroCLIClient { private boolean parseCommandLine(String[] args) throws ParseException { Options options = new Options(); - options.addOption("p", "port", true, "port of the avro source") + options + .addOption("P", "rpcProps", true, "RPC client properties file with " + + "server connection params") + .addOption("p", "port", true, "port of the avro source") .addOption("H", "host", true, "hostname of the avro source") .addOption("F", "filename", true, "file to stream to avro source") .addOption(null, "dirname", true, "directory to stream to avro source") @@ -144,19 +149,34 @@ public class AvroCLIClient { "--filename and --dirname options cannot be used simultaneously"); } - if (!commandLine.hasOption("port")) { - throw new ParseException( - "You must specify a port to connect to with --port"); + if (!commandLine.hasOption("port") && !commandLine.hasOption("host") && + !commandLine.hasOption("rpcProps")) { + throw new ParseException("Either --rpcProps or both --host and --port " + + "must be specified."); } - port = Integer.parseInt(commandLine.getOptionValue("port")); + if (commandLine.hasOption("rpcProps")) { + rpcClientPropsFile = commandLine.getOptionValue("rpcProps"); + Preconditions.checkNotNull(rpcClientPropsFile, "RPC client properties " + + "file must be specified after --rpcProps argument."); + Preconditions.checkArgument(new File(rpcClientPropsFile).exists(), + "RPC client properties file %s does not exist!", rpcClientPropsFile); + } - if (!commandLine.hasOption("host")) { - throw new ParseException( - "You must specify a hostname to connect to with --host"); + if (rpcClientPropsFile == null) { + if (!commandLine.hasOption("port")) { + throw new ParseException( + "You must specify a port to connect to with --port"); + } + port = Integer.parseInt(commandLine.getOptionValue("port")); + + if (!commandLine.hasOption("host")) { + throw new ParseException( + "You must specify a hostname to connect to with --host"); + } + hostname = commandLine.getOptionValue("host"); } - hostname = commandLine.getOptionValue("host"); fileName = commandLine.getOptionValue("filename"); dirName = commandLine.getOptionValue("dirname"); @@ -172,8 +192,14 @@ public class AvroCLIClient { EventReader reader = null; - RpcClient rpcClient = RpcClientFactory.getDefaultInstance(hostname, port, - BATCH_SIZE); + RpcClient rpcClient; + if (rpcClientPropsFile != null) { + rpcClient = RpcClientFactory.getInstance(new File(rpcClientPropsFile)); + } else { + rpcClient = RpcClientFactory.getDefaultInstance(hostname, port, + BATCH_SIZE); + } + try { if (fileName != null) { reader = new SimpleTextLineEventReader(new FileReader(new File(fileName)));
