clean up eagle-data-process Author: @yonzhang2012 <[email protected]> Closes: #343
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b4732cb2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b4732cb2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b4732cb2 Branch: refs/heads/develop Commit: b4732cb2fc104350e2f9e6c3edb8518f41a21a11 Parents: c66b525 Author: yonzhang <[email protected]> Authored: Sun Aug 14 16:56:44 2016 -0700 Committer: yonzhang <[email protected]> Committed: Sun Aug 14 16:56:44 2016 -0700 ---------------------------------------------------------------------- .../AbstractDynamicApplication.scala | 30 ----- .../src/main/java/META-INF/MANIFEST.MF | 19 --- .../impl/storm/kafka/JsonSerializer.java | 58 --------- .../storm/kafka/KafkaSourcedSpoutProvider.java | 104 ---------------- .../storm/kafka/KafkaSourcedSpoutScheme.java | 71 ----------- .../impl/storm/kafka/KafkaSpoutProvider.java | 118 +++++++++++++++++++ .../kafka/NewKafkaSourcedSpoutProvider.java | 118 ------------------- .../eagle/datastream/utils/JavaReflections.java | 31 ----- .../src/main/resources/application.conf | 1 - .../eagle/datastream/utils/ReflectionS.scala | 55 --------- .../entity/AbstractPolicyDefinitionEntity.java | 27 ----- .../alert/entity/AlertStreamSchemaEntity.java | 111 ----------------- .../eagle/policy/DefaultPolicyPartitioner.java | 32 ----- .../eagle/policy/PolicyEvaluationContext.java | 34 ------ .../apache/eagle/policy/PolicyEvaluator.java | 61 ---------- .../apache/eagle/policy/PolicyPartitioner.java | 26 ---- .../org/apache/eagle/policy/ResultRender.java | 32 ----- .../eagle/policy/executor/IPolicyExecutor.java | 29 ----- .../policy/siddhi/SiddhiEvaluationHandler.java | 27 ----- .../src/main/resources/log4j.properties | 21 ---- .../org/apache/eagle/gc/GCLogApplication.java | 4 +- eagle-hadoop-metric/pom.xml | 24 ---- .../assembly/eagle-hadoop-metric-assembly.xml | 64 ---------- .../hadoop/metric/HadoopJmxApplication.java | 32 +++++ .../eagle/hadoop/metric/JsonParserBolt.java | 62 ++++++++++ .../org/apache/eagle/hadoop/metric/Utils.java | 64 ---------- .../src/main/resources/application.conf | 52 +++----- .../kafka/EagleMetricCollectorApplication.java | 71 +---------- .../metric/kafka/KafkaSourcedSpoutProvider.java | 93 +++++++++++++++ .../metric/kafka/KafkaSourcedSpoutScheme.java | 72 +++++++++++ .../hbase/HBaseAuditLogApplication.java | 4 +- .../AbstractHdfsAuditLogApplication.java | 4 +- .../securitylog/HdfsAuthLogMonitoringMain.java | 4 +- .../oozie/parse/OozieAuditLogApplication.java | 4 +- 34 files changed, 405 insertions(+), 1154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala deleted file mode 100644 index b5fbf59..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application - -import com.typesafe.config.Config -import org.apache.eagle.datastream.core.StreamContext - - -trait AbstractDynamicApplication extends TopologyExecutable { - def compileStream(application: String, config: Config): StreamContext = { - null - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF b/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF deleted file mode 100644 index c67816b..0000000 --- a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -Manifest-Version: 1.0 -Class-Path: - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java deleted file mode 100644 index 416aaa3..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.storm.kafka; - -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializationConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; - -public class JsonSerializer implements Serializer<Object> { - private final StringSerializer stringSerializer = new StringSerializer(); - private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class); - private static final ObjectMapper om = new ObjectMapper(); - - static { - om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true); - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - stringSerializer.configure(configs,isKey); - } - - @Override - public byte[] serialize(String topic, Object data) { - String str = null; - try { - str = om.writeValueAsString(data); - } catch (IOException e) { - logger.error("Kafka serialization for send error!", e); - } - return stringSerializer.serialize(topic, str); - } - - @Override - public void close() { - stringSerializer.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java deleted file mode 100644 index c6a4983..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.storm.kafka; - -import java.util.Arrays; - -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpout; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.topology.base.BaseRichSpout; - -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; - -public class KafkaSourcedSpoutProvider implements StormSpoutProvider { - private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class); - - public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { - return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context)); - } - - private String configPrefix = "dataSourceConfig"; - - public KafkaSourcedSpoutProvider(){} - - public KafkaSourcedSpoutProvider(String prefix){ - this.configPrefix = prefix; - } - - @Override - public BaseRichSpout getSpout(Config config){ - Config context = config; - if(this.configPrefix!=null) context = config.getConfig(configPrefix); - // Kafka topic - String topic = context.getString("topic"); - // Kafka consumer group id - String groupId = context.getString("consumerGroupId"); - // Kafka fetch size - int fetchSize = context.getInt("fetchSize"); - // Kafka deserializer class - String deserClsName = context.getString("deserializerClass"); - // Kafka broker zk connection - String zkConnString = context.getString("zkConnection"); - // transaction zkRoot - String zkRoot = context.getString("transactionZKRoot"); - - LOG.info(String.format("Use topic id: %s",topic)); - - String brokerZkPath = null; - if(context.hasPath("brokerZkPath")) { - brokerZkPath = context.getString("brokerZkPath"); - } - - BrokerHosts hosts; - if(brokerZkPath == null) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers - spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(",")); - // transaction zkPort - spoutConfig.zkPort = context.getInt("transactionZKPort"); - // transaction update interval - spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - // "startOffsetTime" is for test usage, prod should not use this - if (context.hasPath("startOffsetTime")) { - spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); - } - // "forceFromStart" is for test usage, prod should not use this - if (context.hasPath("forceFromStart")) { - spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - } - - spoutConfig.scheme = getStreamScheme(deserClsName, context); - return new KafkaSpout(spoutConfig); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java deleted file mode 100644 index 15401fd..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.storm.kafka; - -import backtype.storm.spout.Scheme; -import backtype.storm.tuple.Fields; -import com.typesafe.config.Config; -import org.apache.eagle.datastream.utils.NameConstants; - -import java.lang.reflect.Constructor; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -/** - * This scheme defines how a kafka message is deserialized and the output field name for storm stream - * it includes the following: - * 1. data source is kafka, so need kafka message deserializer class - * 2. output field declaration - */ -public class KafkaSourcedSpoutScheme implements Scheme { - protected SpoutKafkaMessageDeserializer deserializer; - - public KafkaSourcedSpoutScheme(String deserClsName, Config context){ - try{ - Properties prop = new Properties(); - if(context.hasPath("eagleProps")) { - prop.putAll(context.getObject("eagleProps")); - } - Constructor<?> constructor = Class.forName(deserClsName).getConstructor(Properties.class); - deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop); - }catch(Exception ex){ - throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex); - } - } - - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - if(tmp == null) - return null; - // the following tasks are executed within the same process of kafka spout - return Arrays.asList(tmp); - } - - /** - * Default only f0, but it requires to be overrode if different - * - * TODO: Handle the schema with KeyValue based structure - * - * @return Fields - */ - @Override - public Fields getOutputFields() { - return new Fields(NameConstants.FIELD_PREFIX()+"0"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java new file mode 100644 index 0000000..2d2936c --- /dev/null +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java @@ -0,0 +1,118 @@ +/* + * + * * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * + * + */ + +package org.apache.eagle.dataproc.impl.storm.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.Arrays; + +/** + * Since 6/8/16. + */ +public class KafkaSpoutProvider implements StormSpoutProvider { + private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutProvider.class); + + private String configPrefix = "dataSourceConfig"; + + public KafkaSpoutProvider(){} + + public KafkaSpoutProvider(String prefix){ + this.configPrefix = prefix; + } + + @Override + public BaseRichSpout getSpout(Config config){ + Config context = config; + if(this.configPrefix!=null) context = config.getConfig(configPrefix); + // Kafka topic + String topic = context.getString("topic"); + // Kafka consumer group id + String groupId = context.getString("consumerGroupId"); + // Kafka fetch size + int fetchSize = context.getInt("fetchSize"); + // Kafka broker zk connection + String zkConnString = context.getString("zkConnection"); + // transaction zkRoot + String zkRoot = context.getString("transactionZKRoot"); + + LOG.info(String.format("Use topic id: %s",topic)); + + String brokerZkPath = null; + if(context.hasPath("brokerZkPath")) { + brokerZkPath = context.getString("brokerZkPath"); + } + + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(",")); + // transaction zkPort + spoutConfig.zkPort = context.getInt("transactionZKPort"); + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS"); + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + // "startOffsetTime" is for test usage, prod should not use this + if (context.hasPath("startOffsetTime")) { + spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); + } + // "forceFromStart" is for test usage, prod should not use this + if (context.hasPath("forceFromStart")) { + spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); + } + + if (context.hasPath("schemeCls")) { + try { + Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance(); + spoutConfig.scheme = new SchemeAsMultiScheme(s); + }catch(Exception ex){ + LOG.error("error instantiating scheme object"); + throw new IllegalStateException(ex); + } + }else{ + String err = "schemeCls must be present"; + LOG.error(err); + throw new IllegalStateException(err); + } + return new KafkaSpout(spoutConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java deleted file mode 100644 index d764ac1..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * - * * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * * - * - */ - -package org.apache.eagle.dataproc.impl.storm.kafka; - -import backtype.storm.spout.Scheme; -import backtype.storm.spout.SchemeAsMultiScheme; -import backtype.storm.topology.base.BaseRichSpout; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpout; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; - -import java.util.Arrays; - -/** - * Since 6/8/16. - */ -public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider { - private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class); - - private String configPrefix = "dataSourceConfig"; - - public NewKafkaSourcedSpoutProvider(){} - - public NewKafkaSourcedSpoutProvider(String prefix){ - this.configPrefix = prefix; - } - - @Override - public BaseRichSpout getSpout(Config config){ - Config context = config; - if(this.configPrefix!=null) context = config.getConfig(configPrefix); - // Kafka topic - String topic = context.getString("topic"); - // Kafka consumer group id - String groupId = context.getString("consumerGroupId"); - // Kafka fetch size - int fetchSize = context.getInt("fetchSize"); - // Kafka broker zk connection - String zkConnString = context.getString("zkConnection"); - // transaction zkRoot - String zkRoot = context.getString("transactionZKRoot"); - - LOG.info(String.format("Use topic id: %s",topic)); - - String brokerZkPath = null; - if(context.hasPath("brokerZkPath")) { - brokerZkPath = context.getString("brokerZkPath"); - } - - BrokerHosts hosts; - if(brokerZkPath == null) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers - spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(",")); - // transaction zkPort - spoutConfig.zkPort = context.getInt("transactionZKPort"); - // transaction update interval - spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - // "startOffsetTime" is for test usage, prod should not use this - if (context.hasPath("startOffsetTime")) { - spoutConfig.startOffsetTime = context.getInt("startOffsetTime"); - } - // "forceFromStart" is for test usage, prod should not use this - if (context.hasPath("forceFromStart")) { - spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); - } - - if (context.hasPath("schemeCls")) { - try { - Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance(); - spoutConfig.scheme = new SchemeAsMultiScheme(s); - }catch(Exception ex){ - LOG.error("error instantiating scheme object"); - throw new IllegalStateException(ex); - } - }else{ - String err = "schemeCls must be present"; - LOG.error(err); - throw new IllegalStateException(err); - } - return new KafkaSpout(spoutConfig); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java deleted file mode 100644 index 04b4bed..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.utils; - -import java.lang.reflect.ParameterizedType; - -/** - * @since 12/7/15 - */ -class JavaReflections { - @SuppressWarnings("unchecked") - public static Class<?> getGenericTypeClass(final Object obj,int index) { - return (Class<?>) ((ParameterizedType) obj - .getClass() - .getGenericSuperclass()).getActualTypeArguments()[index]; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf index 72c2ae5..c386a71 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf @@ -30,7 +30,6 @@ "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "eagle.consumer", "fetchSize" : 1048586, - "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer", "transactionZKServers" : "sandbox.hortonworks.com", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala deleted file mode 100644 index 1d48752..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.utils - -import scala.reflect.api -import scala.reflect.runtime.{universe => ru} - -/** - * @since 12/7/15 - */ -object Reflections{ - private val UNIT_CLASS = classOf[Unit] - private val UNIT_TYPE_TAG = ru.typeTag[Unit] - - /** - * Class to TypeTag - * @param clazz class - * @tparam T Type T - * @return - */ - def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={ - if(clazz == null){ - null - }else if(clazz == UNIT_CLASS) { - UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]] - } else { - val mirror = ru.runtimeMirror(clazz.getClassLoader) - val sym = mirror.staticClass(clazz.getCanonicalName) - val tpe = sym.selfType - ru.TypeTag(mirror, new api.TypeCreator { - def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) = - if (m eq mirror) tpe.asInstanceOf[U#Type] - else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.") - }) - } - } - - def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]] - def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java deleted file mode 100644 index 3f45be7..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; - -@SuppressWarnings("serial") -public abstract class AbstractPolicyDefinitionEntity extends TaggedLogAPIEntity { - - public abstract String getPolicyDef(); - - public abstract boolean isEnabled(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java deleted file mode 100644 index 4dd9006..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.entity; - -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.meta.Column; -import org.apache.eagle.log.entity.meta.ColumnFamily; -import org.apache.eagle.log.entity.meta.Prefix; -import org.apache.eagle.log.entity.meta.Service; -import org.apache.eagle.log.entity.meta.Table; -import org.apache.eagle.log.entity.meta.Tags; -import org.apache.eagle.log.entity.meta.TimeSeries; - -/** - * ddl to create streammetadata table - * - * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'} - */ -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("alertStreamSchema") -@ColumnFamily("f") -@Prefix("alertStreamSchema") -@Service(Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"application", "streamName", "attrName"}) -public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{ - @Column("a") - private String attrType; - @Column("b") - private String category; - @Column("c") - private String attrValueResolver; - /* all tags form the key for alert de-duplication */ - @Column("d") - private Boolean usedAsTag; - @Column("e") - private String attrDescription; - @Column("f") - private String attrDisplayName; - @Column("g") - private String defaultValue; - - public String getAttrType() { - return attrType; - } - public void setAttrType(String attrType) { - this.attrType = attrType; - valueChanged("attrType"); - } - public String getCategory() { - return category; - } - public void setCategory(String category) { - this.category = category; - valueChanged("category"); - } - public String getAttrValueResolver() { - return attrValueResolver; - } - public void setAttrValueResolver(String attrValueResolver) { - this.attrValueResolver = attrValueResolver; - valueChanged("attrValueResolver"); - } - public Boolean getUsedAsTag() { - return usedAsTag; - } - public void setUsedAsTag(Boolean usedAsTag) { - this.usedAsTag = usedAsTag; - valueChanged("usedAsTag"); - } - public String getAttrDescription() { - return attrDescription; - } - public void setAttrDescription(String attrDescription) { - this.attrDescription = attrDescription; - valueChanged("attrDescription"); - } - public String getAttrDisplayName() { - return attrDisplayName; - } - public void setAttrDisplayName(String attrDisplayName) { - this.attrDisplayName = attrDisplayName; - valueChanged("attrDisplayName"); - } - public String getDefaultValue() { - return defaultValue; - } - public void setDefaultValue(String defaultValue) { - this.defaultValue = defaultValue; - valueChanged("defaultValue"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java deleted file mode 100644 index 1143b11..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy; - - -public class DefaultPolicyPartitioner implements PolicyPartitioner{ - @Override - public int partition(int numTotalPartitions, String policyType, - String policyId) { - final int prime = 31; - int result = 1; - result = result * prime + policyType.hashCode(); - result = result < 0 ? result*-1 : result; - result = result * prime + policyId.hashCode(); - result = result < 0 ? result*-1 : result; - return result % numTotalPartitions; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java deleted file mode 100644 index 7dad895..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy; - -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.policy.executor.IPolicyExecutor; - -public class PolicyEvaluationContext<T extends AbstractPolicyDefinitionEntity, K> { - - public IPolicyExecutor<T, K> alertExecutor; - - public String policyId; - - public PolicyEvaluator<T> evaluator; - - public Collector outputCollector; - - public ResultRender<T, K> resultRender; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java deleted file mode 100644 index 46a63ee..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy; - -import java.util.Map; - -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.dataproc.core.ValuesArray; - -/*** - * - * @param <T> - The policy definition entity - */ -public interface PolicyEvaluator<T extends AbstractPolicyDefinitionEntity> { - /** - * take input and evaluate expression - * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value - * @param input - * @throws Exception - */ - public void evaluate(ValuesArray input) throws Exception; - - /** - * notify policy evaluator that policy is updated - */ - public void onPolicyUpdate(T newAlertDef); - - /** - * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator - */ - public void onPolicyDelete(); - - /** - * get additional context - */ - public Map<String, String> getAdditionalContext(); - - /** - * Get markdown status for the policy. - */ - public boolean isMarkdownEnabled(); - - /** - * Get markdown reason for the given policy. - */ - public String getMarkdownReason(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java deleted file mode 100644 index fa9620c..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy; - -import java.io.Serializable; - -/** - * partition policies so that policies can be distributed into different alert evaluators - */ -public interface PolicyPartitioner extends Serializable { - int partition(int numTotalPartitions, String policyType, String policyId); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java deleted file mode 100644 index cc59880..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; - -import java.util.List; - -/** - * @since Dec 17, 2015 - * - */ -public interface ResultRender<T extends AbstractPolicyDefinitionEntity, K> { - - K render(Config config, List<Object> rets, PolicyEvaluationContext<T, K> siddhiAlertContext, long timestamp); - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java deleted file mode 100644 index c9d28a2..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.executor; - -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler; - -/** - * Created on 1/10/16. - */ -public interface IPolicyExecutor<T extends AbstractPolicyDefinitionEntity, K> extends SiddhiEvaluationHandler<T, K> { - String getExecutorId(); - - int getPartitionSeq(); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java deleted file mode 100644 index 2e8fc55..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; - -import java.util.List; - -public interface SiddhiEvaluationHandler<T extends AbstractPolicyDefinitionEntity, K> { - - void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts); -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties deleted file mode 100644 index d59ded6..0000000 --- a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java index 86d8bc4..e2ac91a 100644 --- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java +++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java @@ -30,7 +30,7 @@ import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.sink.StormStreamSink; -import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; import org.apache.eagle.gc.executor.GCLogAnalyzerBolt; import org.apache.eagle.gc.executor.GCMetricGeneratorBolt; import storm.kafka.StringScheme; @@ -47,7 +47,7 @@ public class GCLogApplication extends StormApplication{ @Override public StormTopology execute(Config config, StormEnvironment environment) { TopologyBuilder builder = new TopologyBuilder(); - NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider(); + KafkaSpoutProvider provider = new KafkaSpoutProvider(); IRichSpout spout = provider.getSpout(config); int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml index 0d612df..15eea00 100644 --- a/eagle-hadoop-metric/pom.xml +++ b/eagle-hadoop-metric/pom.xml @@ -36,28 +36,4 @@ <version>${project.version}</version> </dependency> </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptor>src/assembly/eagle-hadoop-metric-assembly.xml</descriptor> - <finalName>eagle-hadoop-metric-${project.version}</finalName> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <tarLongFileMode>posix</tarLongFileMode> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml b/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml deleted file mode 100644 index b581fbc..0000000 --- a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml +++ /dev/null @@ -1,64 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>assembly</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>false</useProjectArtifact> - <unpack>true</unpack> - <scope>runtime</scope> - <unpackOptions> - <excludes> - <exclude>**/application.conf</exclude> - <exclude>**/defaults.yaml</exclude> - <exclude>**/*storm.yaml</exclude> - <exclude>**/*storm.yaml.1</exclude> - <exclude>**/log4j.properties</exclude> - </excludes> - </unpackOptions> - <excludes> - <exclude>org.apache.storm:storm-core</exclude> - <exclude>org.slf4j:slf4j-api</exclude> - <exclude>org.slf4j:log4j-over-slf4j</exclude> - <exclude>org.slf4j:slf4j-log4j12</exclude> - <exclude>log4j:log4j</exclude> - <exclude>asm:asm</exclude> - <exclude>org.apache.log4j.wso2:log4j</exclude> - </excludes> - </dependencySet> - </dependencySets> - - <fileSets> - <fileSet> - <directory>${project.build.outputDirectory}</directory> - <outputDirectory>/</outputDirectory> - <excludes> - <exclude>application.conf</exclude> - <exclude>log4j.properties</exclude> - <exclude>**/storm.yaml.1</exclude> - </excludes> - </fileSet> - </fileSets> -</assembly> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java index 20ef5d0..40d1a24 100644 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java @@ -18,19 +18,51 @@ package org.apache.eagle.hadoop.metric; import backtype.storm.generated.StormTopology; +import backtype.storm.topology.BoltDeclarer; +import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.sink.StormStreamSink; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; +import storm.kafka.StringScheme; /** * Since 8/12/16. + * This application just pass through data from jmx metric + * For persistence or alert purpose, it is not necessary to start application + * But keep this application in case of future business process + * + * Note: this application should be run as multiple instances based on different topic for data source */ public class HadoopJmxApplication extends StormApplication { + public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks"; + public final static String PARSER_TASK_NUM = "topology.numOfParserTasks"; + public final static String SINK_TASK_NUM = "topology.numOfSinkTasks"; + @Override public StormTopology execute(Config config, StormEnvironment environment) { + int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); + int numOfParserTasks = config.getInt(PARSER_TASK_NUM); + int numOfSinkTasks = config.getInt(SINK_TASK_NUM); + TopologyBuilder builder = new TopologyBuilder(); + + KafkaSpoutProvider provider = new KafkaSpoutProvider(); + IRichSpout spout = provider.getSpout(config); + builder.setSpout("ingest", spout, numOfSpoutTasks); + + JsonParserBolt bolt = new JsonParserBolt(); + BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks); + boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY)); + + StormStreamSink sinkBolt = environment.getStreamSink("hadoop_jmx_stream",config); + BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks); + kafkaBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1")); + return builder.createTopology(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java new file mode 100644 index 0000000..7ca5ba6 --- /dev/null +++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.hadoop.metric; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Map; + +/** + * Since 8/14/16. + */ +public class JsonParserBolt extends BaseRichBolt { + private Logger LOG = LoggerFactory.getLogger(JsonParserBolt.class); + private OutputCollector collector; + private ObjectMapper mapper = new ObjectMapper(); + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple input) { + String msg = input.getString(0); + try { + Map ret = mapper.readValue(msg, Map.class); + collector.emit(Arrays.asList(ret)); + }catch(Exception ex){ + LOG.error("error in passing json message", ex); + }finally{ + collector.ack(input); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("msg")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java deleted file mode 100644 index 173441c..0000000 --- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.hadoop.metric; - -import backtype.storm.spout.SchemeAsMultiScheme; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; - -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -/** - * Created on 1/25/16. - */ -public class Utils { - - /** - * Creates a spout provider that have host-metric as the first tuple data, so that it's feasible for alert grouping. - * - * @param config - * @return - */ - public static KafkaSourcedSpoutProvider createProvider(Config config) { - String deserClsName = config.getString("dataSourceConfig.deserializerClass"); - final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) { - - @Override - public List<Object> deserialize(byte[] ser) { - Object tmp = deserializer.deserialize(ser); - Map<String, Object> map = (Map<String, Object>) tmp; - if (tmp == null) return null; - // this is the key to be grouped by - return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp); - } - - }; - - KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() { - - @Override - public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) { - return new SchemeAsMultiScheme(scheme); - } - - }; - return provider; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf index dc1c7f3..e75355f 100644 --- a/eagle-hadoop-metric/src/main/resources/application.conf +++ b/eagle-hadoop-metric/src/main/resources/application.conf @@ -14,56 +14,38 @@ # limitations under the License. { - "envContextConfig" : { - "env" : "storm", - "mode" : "local", - "topologyName" : "hadoopJmxMetricTopology", - "stormConfigFile" : "hadoopjmx.yaml", - "parallelismConfig" : { - "kafkaMsgConsumer" : 1, - "hadoopJmxMetricAlertExecutor*" : 1 - } + "appId" : "HadoopJmxApplication", + "mode" : "LOCAL", + "siteId" : "testsite", + "topology" : { + "numOfSpoutTasks" : 2, + "numOfParserTasks" : 2, + "numOfSinkTasks" : 2 }, "dataSourceConfig": { - "topic" : "nn_jmx_metric_sandbox", - "zkConnection" : "sandbox.hortonworks.com:2181", + "topic" : "jmx_metric", + "zkConnection" : "server.eagle.apache.org:2181", "zkConnectionTimeoutMS" : 15000, "consumerGroupId" : "EagleConsumer", "fetchSize" : 1048586, - "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer", - "transactionZKServers" : "sandbox.hortonworks.com", + "transactionZKServers" : "server.eagle.apache.org", "transactionZKPort" : 2181, "transactionZKRoot" : "/consumers", "transactionStateUpdateMS" : 2000 - }, - "alertExecutorConfigs" : { - "hadoopJmxMetricAlertExecutor" : { - "parallelism" : 1, - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } + "schemeCls" : "storm.kafka.StringScheme" }, "eagleProps" : { - "site" : "sandbox", - "application": "hadoopJmxMetricDataSource", - "dataJoinPollIntervalSec" : 30, - "mailHost" : "mailHost.com", - "mailSmtpPort":"25", - "mailDebug" : "true", - "balancePartitionEnabled" : true, - #"partitionRefreshIntervalInMin" : 60, - #"kafkaStatisticRangeInMin" : 60, "eagleService": { "host": "localhost", - "port": 9099, + "port": 9090, "username": "admin", "password": "secret" } - "readHdfsUserCommandPatternFrom" : "file" }, - "dynamicConfigSource" : { - "enabled" : true, - "initDelayMillis" : 0, - "delayMillis" : 30000 + "dataSinkConfig": { + "topic" : "jmx_metric_parsed", + "brokerList" : "server.eagle.apache.org:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java index c738b90..4ef50ea 100644 --- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java @@ -17,7 +17,6 @@ package org.apache.eagle.metric.kafka; import backtype.storm.generated.StormTopology; -import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichSpout; @@ -26,16 +25,8 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider; -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import storm.kafka.BrokerHosts; -import storm.kafka.KafkaSpout; -import storm.kafka.SpoutConfig; -import storm.kafka.ZkHosts; - -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -62,69 +53,9 @@ public class EagleMetricCollectorApplication extends StormApplication{ } }; - // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method - KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() { - @Override - public BaseRichSpout getSpout(Config context) { - // Kafka topic - String topic = context.getString("dataSourceConfig.topic"); - // Kafka consumer group id - String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId"); - // Kafka fetch size - int fetchSize = context.getInt("dataSourceConfig.fetchSize"); - // Kafka deserializer class - String deserClsName = context.getString("dataSourceConfig.deserializerClass"); - - // Kafka broker zk connection - String zkConnString = context.getString("dataSourceConfig.zkQuorum"); - - // transaction zkRoot - String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); - - LOG.info(String.format("Use topic id: %s",topic)); - - String brokerZkPath = null; - if(context.hasPath("dataSourceConfig.brokerZkPath")) { - brokerZkPath = context.getString("dataSourceConfig.brokerZkPath"); - } - - BrokerHosts hosts; - if(brokerZkPath == null) { - hosts = new ZkHosts(zkConnString); - } else { - hosts = new ZkHosts(zkConnString, brokerZkPath); - } - - SpoutConfig spoutConfig = new SpoutConfig(hosts, - topic, - zkRoot + "/" + topic, - groupId); - - // transaction zkServers - String[] zkConnections = zkConnString.split(","); - List<String> zkHosts = new ArrayList<>(); - for (String zkConnection : zkConnections) { - zkHosts.add(zkConnection.split(":")[0]); - } - Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); - - spoutConfig.zkServers = zkHosts; - // transaction zkPort - spoutConfig.zkPort = zkPort; - // transaction update interval - spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS"); - // Kafka fetch size - spoutConfig.fetchSizeBytes = fetchSize; - - spoutConfig.scheme = new SchemeAsMultiScheme(scheme); - KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); - return kafkaSpout; - } - }; - TopologyBuilder builder = new TopologyBuilder(); BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config); - BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config); + BaseRichSpout spout2 = KafkaSourcedSpoutProvider.getSpout(config, scheme); int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM); int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java new file mode 100644 index 0000000..382ae1a --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.spout.SchemeAsMultiScheme; +import backtype.storm.topology.base.BaseRichSpout; +import com.typesafe.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import storm.kafka.BrokerHosts; +import storm.kafka.KafkaSpout; +import storm.kafka.SpoutConfig; +import storm.kafka.ZkHosts; + +import java.util.ArrayList; +import java.util.List; + +/** + * Since 8/14/16. + */ +public class KafkaSourcedSpoutProvider { + private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class); + public static BaseRichSpout getSpout(Config context, Scheme scheme) { + // Kafka topic + String topic = context.getString("dataSourceConfig.topic"); + // Kafka consumer group id + String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId"); + // Kafka fetch size + int fetchSize = context.getInt("dataSourceConfig.fetchSize"); + // Kafka deserializer class + String deserClsName = context.getString("dataSourceConfig.deserializerClass"); + + // Kafka broker zk connection + String zkConnString = context.getString("dataSourceConfig.zkQuorum"); + + // transaction zkRoot + String zkRoot = context.getString("dataSourceConfig.transactionZKRoot"); + + LOG.info(String.format("Use topic id: %s",topic)); + + String brokerZkPath = null; + if(context.hasPath("dataSourceConfig.brokerZkPath")) { + brokerZkPath = context.getString("dataSourceConfig.brokerZkPath"); + } + + BrokerHosts hosts; + if(brokerZkPath == null) { + hosts = new ZkHosts(zkConnString); + } else { + hosts = new ZkHosts(zkConnString, brokerZkPath); + } + + SpoutConfig spoutConfig = new SpoutConfig(hosts, + topic, + zkRoot + "/" + topic, + groupId); + + // transaction zkServers + String[] zkConnections = zkConnString.split(","); + List<String> zkHosts = new ArrayList<>(); + for (String zkConnection : zkConnections) { + zkHosts.add(zkConnection.split(":")[0]); + } + Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]); + + spoutConfig.zkServers = zkHosts; + // transaction zkPort + spoutConfig.zkPort = zkPort; + // transaction update interval + spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS"); + // Kafka fetch size + spoutConfig.fetchSizeBytes = fetchSize; + + spoutConfig.scheme = new SchemeAsMultiScheme(scheme); + KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); + return kafkaSpout; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java new file mode 100644 index 0000000..14b2384 --- /dev/null +++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.metric.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.tuple.Fields; +import com.typesafe.config.Config; +import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; +import org.apache.eagle.datastream.utils.NameConstants; + +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +/** + * This scheme defines how a kafka message is deserialized and the output field name for storm stream + * it includes the following: + * 1. data source is kafka, so need kafka message deserializer class + * 2. output field declaration + */ +public class KafkaSourcedSpoutScheme implements Scheme { + protected SpoutKafkaMessageDeserializer deserializer; + + public KafkaSourcedSpoutScheme(String deserClsName, Config context){ + try{ + Properties prop = new Properties(); + if(context.hasPath("eagleProps")) { + prop.putAll(context.getObject("eagleProps")); + } + Constructor<?> constructor = Class.forName(deserClsName).getConstructor(Properties.class); + deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop); + }catch(Exception ex){ + throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex); + } + } + + @Override + public List<Object> deserialize(byte[] ser) { + Object tmp = deserializer.deserialize(ser); + if(tmp == null) + return null; + // the following tasks are executed within the same process of kafka spout + return Arrays.asList(tmp); + } + + /** + * Default only f0, but it requires to be overrode if different + * + * TODO: Handle the schema with KeyValue based structure + * + * @return Fields + */ + @Override + public Fields getOutputFields() { + return new Fields(NameConstants.FIELD_PREFIX()+"0"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java index 432043f..f5753cf 100644 --- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java +++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java @@ -26,7 +26,7 @@ import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.sink.StormStreamSink; -import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider; +import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider; /** * Since 7/27/16. @@ -40,7 +40,7 @@ public class HBaseAuditLogApplication extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { TopologyBuilder builder = new TopologyBuilder(); - NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider(); + KafkaSpoutProvider provider = new KafkaSpoutProvider(); IRichSpout spout = provider.getSpout(config); HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
