[
https://issues.apache.org/jira/browse/METRON-590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761957#comment-15761957
]
ASF GitHub Bot commented on METRON-590:
---------------------------------------
Github user ottobackwards commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/395#discussion_r93092757
--- Diff:
metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/manager/ZkConfigurationManager.java
---
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.configuration.manager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.metron.common.utils.JSONUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.commons.lang.ArrayUtils.isNotEmpty;
+
+/**
+ * Responsible for managing configuration values that are created,
persisted, and updated
+ * in Zookeeper.
+ */
+public class ZkConfigurationManager implements ConfigurationManager {
+
+ /**
+ * A Zookeeper client.
+ */
+ private CuratorFramework zookeeperClient;
+
+ /**
+ * The configuration values under management. Maps the path to the
configuration values
+ * in Zookeeper to the cache of its values.
+ */
+ private Map<String, NodeCache> valuesCache;
+
+ /**
+ * @param zookeeperClient The client used to communicate with Zookeeper.
The client is not
+ * closed. It must be managed externally.
+ */
+ public ZkConfigurationManager(CuratorFramework zookeeperClient) {
+ this.zookeeperClient = zookeeperClient;
+ this.valuesCache = Collections.synchronizedMap(new HashMap<>());
+ }
+
+ /**
+ * Define the paths within Zookeeper that contains configuration values
that need managed.
+ * @param zookeeperPath The Zookeeper path.
+ */
+ @Override
+ public ZkConfigurationManager with(String zookeeperPath) {
+ NodeCache cache = new NodeCache(zookeeperClient, zookeeperPath);
+ valuesCache.put(zookeeperPath, cache);
+ return this;
+ }
+
+ /**
+ * Open a connection to Zookeeper and retrieve the initial configuration
value.
+ */
+ @Override
+ public synchronized ZkConfigurationManager open() throws IOException {
+ try {
+ doOpen();
+ } catch(Exception e) {
+ throw new IOException(e);
+ }
+
+ return this;
+ }
+
+ private void doOpen() throws Exception {
+ for (NodeCache cache : valuesCache.values()) {
+ cache.start(true);
+ }
+ }
+
+ /**
+ * Retrieve the configuration object.
+ */
+ @Override
+ public synchronized <T> Optional<T> get(String key, Class<T> clazz)
throws IOException {
+ T result = null;
+
+ NodeCache cache = valuesCache.get(key);
+ if(cache != null && cache.getCurrentData() != null &&
isNotEmpty(cache.getCurrentData().getData())) {
+ result = deserialize(cache.getCurrentData().getData(), clazz);
+ }
+
+ return Optional.ofNullable(result);
+ }
+
+ /**
+ * Close the configuration manager.
+ *
+ * Does not close the zookeeperClient that was passed in to the
constructor.
+ */
+ @Override
+ public synchronized void close() {
+ for (NodeCache cache : valuesCache.values()) {
+ CloseableUtils.closeQuietly(cache);
--- End diff --
my apologies
> Enable Use of Event Time in Profiler
> ------------------------------------
>
> Key: METRON-590
> URL: https://issues.apache.org/jira/browse/METRON-590
> Project: Metron
> Issue Type: Improvement
> Reporter: Nick Allen
> Assignee: Nick Allen
>
> There are at least two different times that are important to consider when
> handling the telemetry messages received by Metron.
> (1) Processing time is the time at which Metron processed the message.
> (2) Event time is the time at which the event actually occurred.
> If Metron is consuming live data and all is well, the processing and event
> times may remain close and consistent. When processing time differs from
> event time the data produced by the Profiler may be inaccurate. There are a
> few scenarios under which these times might differ greatly which would
> negatively impact the feature set produced by the Profiler.
> (1) When the system has experienced an outage, for example, a scheduled
> maintenance window. When restarted a high volume of messages will need to be
> processed by the Profiler. The output of the Profiler will indicate an
> increase in activity, although no change in activity actually occurred on the
> target network. This could happen whether the outage was Metron itself or an
> upstream system that feeds data to Metron.
> (2) If the user attempts to replay historical telemetry through the Profiler,
> the Profiler will attribute the activity to the time period in which it was
> processed. Obviously the activity should be attributed to the time period in
> which the raw telemetry events originated in.
> There are some scenarios when processing time might be preferred and other
> use cases where event time is preferred. The Profiler should be enhanced to
> allow it to produce profiles based on either processing time or event time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)