Repository: metron Updated Branches: refs/heads/master aee018476 -> cc111ec98
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java index 49f111d..3316b32 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java @@ -17,10 +17,12 @@ */ package org.apache.metron.parsers.bolt; +import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.*; import org.apache.metron.common.error.MetronError; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; import org.apache.metron.test.error.MetronErrorJSONMatcher; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -115,13 +117,35 @@ public class ParserBoltTest extends BaseBoltTest { } } + private static ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return new ConfigurationsUpdater<ParserConfigurations>(null, null) { + @Override + public void update(CuratorFramework client, String path, byte[] data) throws IOException { } + + @Override + public void delete(CuratorFramework client, String path, byte[] data) throws IOException { } + + @Override + public ConfigurationType getType() { + return ConfigurationType.PARSER; + } + + @Override + public void update(String name, byte[] data) throws IOException { } + + @Override + public void delete(String name) { } + + @Override + public Class<ParserConfigurations> getConfigurationClass() { + return ParserConfigurations.class; + } - @Test - public void testEmpty() throws Exception { - String sensorType = "yaf"; - ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { @Override - protected ParserConfigurations defaultConfigurations() { + public void forceUpdate(CuratorFramework client) { } + + @Override + public ParserConfigurations defaultConfigurations() { return new ParserConfigurations() { @Override public SensorParserConfig getSensorParserConfig(String sensorType) { @@ -135,11 +159,22 @@ public class ParserBoltTest extends BaseBoltTest { } }; } + }; + } + + @Test + public void testEmpty() throws Exception { + String sensorType = "yaf"; + ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); + } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(writer, times(1)).init(); @@ -165,28 +200,15 @@ public class ParserBoltTest extends BaseBoltTest { String sensorType = "yaf"; ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - }}; - } - - - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } }; buildGlobalConfig(parserBolt); parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); byte[] sampleBinary = "some binary message".getBytes(); @@ -218,23 +240,13 @@ public class ParserBoltTest extends BaseBoltTest { String sensorType = "yaf"; ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(writer)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - }}; - } - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } + }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(writer, times(1)).init(); @@ -274,24 +286,13 @@ public void testImplicitBatchOfOne() throws Exception { ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - }}; - } - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); @@ -334,10 +335,14 @@ public void testImplicitBatchOfOne() throws Exception { throw new RuntimeException(e); } } + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); + } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); @@ -371,10 +376,15 @@ public void testImplicitBatchOfOne() throws Exception { throw new RuntimeException(e); } } + + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); + } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); @@ -440,10 +450,15 @@ public void testImplicitBatchOfOne() throws Exception { throw new RuntimeException(e); } } + + @Override + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); + } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); when(t1.getBinary(0)).thenReturn(new byte[] {}); parserBolt.execute(t1); @@ -459,25 +474,13 @@ public void testImplicitBatchOfOne() throws Exception { ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - put(IndexingConfigurations.BATCH_SIZE_CONF, "1"); - }}; - } - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); @@ -498,25 +501,13 @@ public void testImplicitBatchOfOne() throws Exception { ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - put(IndexingConfigurations.BATCH_SIZE_CONF, 5); - }}; - } - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); @@ -541,31 +532,20 @@ public void testImplicitBatchOfOne() throws Exception { } + @Test public void testBatchOfFiveWithError() throws Exception { String sensorType = "yaf"; ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) { @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations() { - @Override - public SensorParserConfig getSensorParserConfig(String sensorType) { - return new SensorParserConfig() { - @Override - public Map<String, Object> getParserConfig() { - return new HashMap<String, Object>() {{ - put(IndexingConfigurations.BATCH_SIZE_CONF, 5); - }}; - } - }; - } - }; + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return ParserBoltTest.createUpdater(); } }; parserBolt.setCuratorFramework(client); - parserBolt.setTreeCache(cache); + parserBolt.setZKCache(cache); parserBolt.prepare(new HashMap(), topologyContext, outputCollector); verify(parser, times(1)).init(); verify(batchWriter, times(1)).init(any(), any(), any()); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-test-utilities/pom.xml b/metron-platform/metron-test-utilities/pom.xml index 2502760..38f8a38 100644 --- a/metron-platform/metron-test-utilities/pom.xml +++ b/metron-platform/metron-test-utilities/pom.xml @@ -28,6 +28,11 @@ </properties> <dependencies> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-zookeeper</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> @@ -127,11 +132,7 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <version>${global_curator_version}</version> - </dependency> + <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java index 75f999a..ac64b6a 100644 --- a/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java +++ b/metron-platform/metron-test-utilities/src/main/java/org/apache/metron/test/bolt/BaseBoltTest.java @@ -17,6 +17,7 @@ */ package org.apache.metron.test.bolt; +import org.apache.metron.zookeeper.ZKCache; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -54,7 +55,7 @@ public abstract class BaseBoltTest { protected CuratorFramework client; @Mock - protected TreeCache cache; + protected ZKCache cache; @Before public void initMocks() { http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-zookeeper/pom.xml b/metron-platform/metron-zookeeper/pom.xml new file mode 100644 index 0000000..e02cafd --- /dev/null +++ b/metron-platform/metron-zookeeper/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-platform</artifactId> + <version>0.4.1</version> + </parent> + <artifactId>metron-zookeeper</artifactId> + <name>metron-zookeeper</name> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-client</artifactId> + <version>${global_curator_version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${global_curator_version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${global_guava_version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java new file mode 100644 index 0000000..1078f1e --- /dev/null +++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/SimpleEventListener.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.zookeeper; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; + +/** + * This is a simple convenience implementation of a TreeCacheListener. + * It allows multiple callbacks to be called with one listener. + */ +public class SimpleEventListener implements TreeCacheListener { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The callback interface. This is to be implemented for all callbacks bound to a SimpleEventListener + */ + public interface Callback { + /** + * Called upon an event. + * @param client The zookeeper client + * @param path The zookeeper path changed + * @param data The data that changed. + * @throws IOException + */ + void apply(CuratorFramework client, String path, byte[] data) throws IOException; + } + + /** + * Builder to create a SimpleEventListener + */ + public static class Builder { + private EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks = new EnumMap<>(TreeCacheEvent.Type.class); + + /** + * Add a callback bound to one or more TreeCacheEvent.Type. + * @param callback The callback to be called when an event of each of types happens + * @param types The zookeeper event types to bind to + * @return The Builder + */ + public Builder with(Callback callback, TreeCacheEvent.Type... types) { + return with(ImmutableList.of(callback), types); + } + + /** + * Add a callback bound to one or more TreeCacheEvent.Type. + * @param callback The iterable of callbacks to be called when an event of each of types happens + * @param types The zookeeper event types to bind to + * @return The Builder + */ + public Builder with(Iterable<? extends Callback> callback, TreeCacheEvent.Type... types) { + for(TreeCacheEvent.Type t : types) { + List<Callback> cbs = callbacks.get(t); + if(cbs == null) { + cbs = new ArrayList<>(); + } + Iterables.addAll(cbs, callback); + callbacks.put(t, cbs); + } + return this; + } + + /** + * Create the listener. + * @return The SimpleEventListener + */ + public SimpleEventListener build() { + return new SimpleEventListener(callbacks); + } + + } + + EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks; + + private SimpleEventListener(EnumMap<TreeCacheEvent.Type, List<Callback>> callbacks) { + this.callbacks = callbacks; + } + + @Override + public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { + String path = null; + byte[] data = null; + if(event != null && event.getData() != null) { + path = event.getData().getPath(); + data = event.getData().getData(); + } + LOG.debug("Type: {}, Path: {}, Data: {}", event.getType(), (path == null?"":path) , (data == null?"":new String(data))); + List<Callback> callback = callbacks.get(event.getType()); + if(callback != null) { + for(Callback cb : callback) { + cb.apply(client, path, data); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java new file mode 100644 index 0000000..58a6329 --- /dev/null +++ b/metron-platform/metron-zookeeper/src/main/java/org/apache/metron/zookeeper/ZKCache.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.zookeeper; + +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * A zookeeper cache that composes the Curator TreeCache. This is the basic point of + * abstraction to interact with metron configuration in Zookeeper. + */ +public class ZKCache implements AutoCloseable{ + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final int DEFAULT_CLIENT_SLEEP_MS = 1000; + public static final int DEFAULT_MAX_RETRIES = 3; + + + /** + * Build a ZKCache instance. + */ + public static class Builder { + private Optional<CuratorFramework> client = Optional.empty(); + private boolean ownClient = false; + private List<TreeCacheListener> listener = new ArrayList<>(); + private String zkRoot; + + public Builder() { } + + /** + * Specify your own client. If you specify this, closing will not close your Client. + * If a client is not passed in, then one is created and will be closed when the ZKCache + * closes. + * @param client The CuratorFramework client. + * @return The Builder + */ + public Builder withClient(CuratorFramework client) { + this.client = Optional.ofNullable(client); + ownClient = false; + return this; + } + + /** + * Specify your own zookeeper URL. If you pass this in, the ZKCache will own the client + * and it will be closed when the ZKCache is closed. + * + * @param zookeeperUrl The zookeeper quorum + * @return The Builder + */ + public Builder withClient(String zookeeperUrl) { + this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.empty())); + ownClient = true; + return this; + } + + /** + * Specify your own zookeeper URL. If you pass this in, the ZKCache will own the client + * and it will be closed when the ZKCache is closed. + * + * @param zookeeperUrl The zookeeper quorum + * @param retryPolicy The RetryPolicy to use + * @return The Builder + */ + public Builder withClient(String zookeeperUrl, RetryPolicy retryPolicy) { + this.client = Optional.ofNullable(createClient(zookeeperUrl, Optional.ofNullable(retryPolicy))); + ownClient = true; + return this; + } + + /** + * Specify the treecache listener, which will be called when changes happen to the zookeeper root. + * + * @param listener The callback which is called when changes happen in zookeeper. + * @return The Builder + */ + public Builder withListener(TreeCacheListener listener) { + this.listener.add(listener); + return this; + } + + /** + * Specify the root in zookeeper to monitor. + * @param zkRoot The root path in zookeeper + * @return The Builder + */ + public Builder withRoot(String zkRoot) { + this.zkRoot = zkRoot; + return this; + } + + /** + * Create the ZKCache object based on the config passed in the Builder. + * @return The ZKCache + */ + public ZKCache build() { + if(!client.isPresent()) { + throw new IllegalArgumentException("Zookeeper client must be specified."); + } + if(listener.isEmpty()) { + LOG.warn("Zookeeper listener is null or empty, which is very likely an error."); + } + if(zkRoot == null) { + throw new IllegalArgumentException("Zookeeper root must not be null."); + } + return new ZKCache(client.get(), listener, zkRoot, ownClient); + } + + } + + private CuratorFramework client; + private List<TreeCacheListener> listeners; + private TreeCache cache; + private String zkRoot; + private boolean ownClient = false; + + private ZKCache(CuratorFramework client, List<TreeCacheListener> listeners, String zkRoot, boolean ownClient) { + this.client = client; + this.listeners = listeners; + this.ownClient = ownClient; + if(zkRoot == null) { + throw new IllegalArgumentException("Zookeeper root must not be null."); + } + this.zkRoot = zkRoot; + } + + /** + * Return the client used. + * NOTE: DO NOT CLOSE THIS CLIENT OUT OF BAND. + * @return The Curator Client + */ + public CuratorFramework getClient() { + return client; + } + + + /** + * Start the cache. + * @throws Exception If unable to be started. + */ + public void start() throws Exception { + if(cache == null) { + if(ownClient) { + client.start(); + } + TreeCache.Builder builder = TreeCache.newBuilder(client, zkRoot); + builder.setCacheData(true); + cache = builder.build(); + for(TreeCacheListener l : listeners) { + cache.getListenable().addListener(l); + } + cache.start(); + } + } + + /** + * Close the cache, which closes the client if it's owned by the ZKCache. + */ + @Override + public void close() { + cache.close(); + if(ownClient) { + client.close(); + } + } + + public static CuratorFramework createClient(String zookeeperUrl, Optional<RetryPolicy> retryPolicy) { + return CuratorFrameworkFactory.newClient(zookeeperUrl, retryPolicy.orElse(new ExponentialBackoffRetry(DEFAULT_CLIENT_SLEEP_MS, DEFAULT_MAX_RETRIES))); + } + + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index 93ced81..3e72d78 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -60,6 +60,7 @@ <module>metron-elasticsearch</module> <module>metron-storm-kafka</module> <module>metron-storm-kafka-override</module> + <module>metron-zookeeper</module> </modules> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-stellar/stellar-common/src/main/scripts/stellar ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/scripts/stellar b/metron-stellar/stellar-common/src/main/scripts/stellar index a93d09e..2f1cdbe 100644 --- a/metron-stellar/stellar-common/src/main/scripts/stellar +++ b/metron-stellar/stellar-common/src/main/scripts/stellar @@ -33,4 +33,4 @@ export METRON_VERSION=${project.version} export METRON_HOME=/usr/metron/$METRON_VERSION export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar) export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar) -java $JVMFLAGS -cp "$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB" org.apache.metron.stellar.common.shell.StellarShell "$@" +java $JVMFLAGS -cp "${CONTRIB:-$METRON_HOME/contrib}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS" org.apache.metron.stellar.common.shell.StellarShell "$@"
