Repository: incubator-metron Updated Branches: refs/heads/master eb6739399 -> 5843421f5
METRON-428: Allow a kafka offset to be passed to the ParserTopology CLI closes apache/incubator-metron#258 Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/5843421f Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/5843421f Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/5843421f Branch: refs/heads/master Commit: 5843421f5205d007569b274363adc3ee03dc8efe Parents: eb67393 Author: cstella <ceste...@gmail.com> Authored: Tue Sep 20 11:20:05 2016 -0400 Committer: cstella <ceste...@gmail.com> Committed: Tue Sep 20 11:20:05 2016 -0400 ---------------------------------------------------------------------- .../metron/parsers/topology/ParserTopologyCLI.java | 10 ++++++++++ .../parsers/topology/ParserTopologyCLITest.java | 16 ++++++++++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5843421f/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java index 7bf56e3..0ebec6c 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java @@ -183,6 +183,13 @@ public class ParserTopologyCLI { o.setRequired(false); return o; }) + ,KAFKA_OFFSET("koff", code -> + { + Option o = new Option("koff", "kafka_offset", true, "Kafka offset"); + o.setArgName("BEGINNING|WHERE_I_LEFT_OFF"); + o.setRequired(false); + return o; + }) ; Option option; String shortCode; @@ -285,6 +292,9 @@ public class ParserTopologyCLI { spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd))); } SpoutConfig.Offset offset = cmd.hasOption("t") ? SpoutConfig.Offset.BEGINNING : SpoutConfig.Offset.WHERE_I_LEFT_OFF; + if(cmd.hasOption("koff")) { + offset = SpoutConfig.Offset.valueOf(cmd.getOptionValue("koff")); + } TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl, brokerUrl, sensorType, http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/5843421f/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java index 5f0548a..2940bfe 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java @@ -74,6 +74,22 @@ public class ParserTopologyCLITest { @Test + public void testKafkaOffset_happyPath() throws ParseException { + kafkaOffset(true); + kafkaOffset(false); + } + public void kafkaOffset(boolean longOpt) throws ParseException { + CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker") + .with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk") + .with(ParserTopologyCLI.ParserOptions.SENSOR_TYPE, "mysensor") + .with(ParserTopologyCLI.ParserOptions.KAFKA_OFFSET, "BEGINNING") + .build(longOpt); + Assert.assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli)); + Assert.assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli)); + Assert.assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPE.get(cli)); + Assert.assertEquals("BEGINNING", ParserTopologyCLI.ParserOptions.KAFKA_OFFSET.get(cli)); + } + @Test public void testCLI_happyPath() throws ParseException { happyPath(true); happyPath(false);