http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf deleted file mode 100644 index 85b5334..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_5.conf +++ /dev/null @@ -1,110 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - config { - envContextConfig { - "env" : "storm" - "mode" : "cluster" - "topologyName" : "dynamical-topology-5" - "parallelismConfig" : { - "kafkaMsgConsumer" : 1 - }, - "nimbusHost":"sandbox.hortonworks.com", - "nimbusThriftPort":6627 - } - alertExecutorConfigs { - defaultAlertExecutor { - "parallelism" : 1 - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - } - eagleProps { - "site" : "sandbox" - "application": "HADOOP" - } - } - - dataflow { - KafkaSource.JmxStreamOne { - parallism = 1000 - topic = "metric_event_1" - zkConnection = "sandbox.hortonworks.com:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "sandbox.hortonworks.com" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamTwo { - parallism = 1000 - topic = "metric_event_2" - zkConnection = "sandbox.hortonworks.com:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "sandbox.hortonworks.com" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamThree{ - parallism = 1000 - topic = "metric_event_3" - zkConnection = "sandbox.hortonworks.com:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "sandbox.hortonworks.com" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - Console.printer { - format = "%s" - } - - KafkaSink.metricStore { - "topic" = "metric_event_persist" - "bootstrap.servers" = "sandbox.hortonworks.com:6667" - } - - Alert.defaultAlertExecutor { - // upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] - // alertExecutorId = defaultAlertExecutor - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer { - grouping = shuffle - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala deleted file mode 100644 index 7b552da..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala +++ /dev/null @@ -1,37 +0,0 @@ -package org.apache.eagle.stream.pipeline - -import com.typesafe.config.ConfigFactory -import org.scalatest.{FlatSpec, Matchers} - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -class ConfigSpec extends FlatSpec with Matchers{ - "Config" should "be overrode correctly" in { - val conf1 = ConfigFactory.parseString( - """ - |value=1 - """.stripMargin) - val conf2 = ConfigFactory.parseString( - """ - |value=2 - """.stripMargin) - val conf3 = conf1.withFallback(conf2) - val conf4 = conf2.withFallback(conf1) - conf3.getInt("value") should be(1) - conf4.getInt("value") should be(2) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala deleted file mode 100644 index e63280a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline - -import com.typesafe.config.ConfigFactory -import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, DataFlow, DefinitionIdentifier, Identifier} -import org.scalatest.{FlatSpec, Matchers} - -class DataFlowSpec extends FlatSpec with Matchers { - val dataFlowConfig = - """ - |{ - | kafkaSource.metric_event_1 { - | schema { - | metric: string - | timestamp: long - | value: double - | } - | parallism = 1000 - | topic = "metric_event_1" - | zkConnection = "localhost:2181" - | zkConnectionTimeoutMS = 15000 - | consumerGroupId = "Consumer" - | fetchSize = 1048586 - | transactionZKServers = "localhost" - | transactionZKPort = 2181 - | transactionZKRoot = "/consumers" - | transactionStateUpdateMS = 2000 - | deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - | } - | - | kafkaSource.metric_event_2 { - | schema = { - | metric: string - | timestamp: long - | value: double - | } - | parallism = 1000 - | topic = "metric_event_2" - | zkConnection = "localhost:2181" - | zkConnectionTimeoutMS = 15000 - | consumerGroupId = "Consumer" - | fetchSize = 1048586 - | transactionZKServers = "localhost" - | transactionZKPort = 2181 - | transactionZKRoot = "/consumers" - | transactionStateUpdateMS = 2000 - | deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - | } - | - | kafkaSink.metricStore {} - | - | alert.alert { - | executor = "alertExecutor" - | } - | - | aggregator.aggreator { - | executor = "aggreationExecutor" - | } - | - | metric_event_1|metric_event_2 -> alert {} - | metric_event_1|metric_event_2 -> metricStore {} - |} - """.stripMargin - - DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in { - val config = ConfigFactory.parseString(dataFlowConfig) - config should not be null - val dataflow = DataFlow.parse(config) - dataflow should not be null - dataflow.getConnectors.size should be(4) - dataflow.getProcessors.size should be(5) - } - - Identifier.getClass.toString should "parse as definition" in { - val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier] - defId.moduleType should be("kafka") - } - - Identifier.getClass.toString should "parse node1 -> node2 as connection" in { - val id = Identifier.parse("node1 -> node2").asInstanceOf[ConnectionIdentifier] - id.fromIds.size should be(1) - } - - Identifier.getClass.toString should "parse node1|node2 -> node3" in { - val id = Identifier.parse("node1|node2 -> node3").asInstanceOf[ConnectionIdentifier] - id.fromIds.size should be(2) - } - - Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as connection" in { - val id = Identifier.parse("node1|node2|node3 -> node4").asInstanceOf[ConnectionIdentifier] - id.fromIds.size should be(3) - } - - Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as connection" in { - val id = Identifier.parse("node1 | node2 | node3 -> node4").asInstanceOf[ConnectionIdentifier] - id.fromIds.size should be(3) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala deleted file mode 100644 index 5e2007d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline - -import org.apache.eagle.datastream.ExecutionEnvironments.storm -import org.scalatest.{FlatSpec, Matchers} - -class PipelineSpec extends FlatSpec with Matchers{ - "Pipeline" should "parse successfully from pipeline_1.conf" in { - val pipeline = Pipeline.parseResource("pipeline_1.conf") - pipeline should not be null - } - - "Pipeline" should "compile successfully from pipeline_2.conf" in { - val pipeline = Pipeline.parseResource("pipeline_2.conf") - pipeline should not be null - val stream = Pipeline.compile(pipeline) - stream should not be null - // Throw ClassNotFoundException when submit in unit test - // stream.submit[storm] - } -} - -/** - * Storm LocalCluster throws ClassNotFoundException when submit in unit test, so here submit in App - */ -object PipelineSpec_2 extends App{ - val pipeline = Pipeline(args).parseResource("pipeline_2.conf") - val stream = Pipeline.compile(pipeline) - stream.submit[storm] -} - -object PipelineSpec_3 extends App { - Pipeline(args).submit[storm]("pipeline_3.conf") -} - -object PipelineSpec_4 extends App { - Pipeline(args).submit[storm]("pipeline_4.conf") -} - -object PipelineSpec_5 extends App { - Pipeline(args).submit[storm]("pipeline_5.conf") -} - -object PipelineCLISpec extends App{ - Pipeline.main(args) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml index e9be371..5e3fc9e 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/pom.xml @@ -66,11 +66,6 @@ </dependency> <dependency> <groupId>org.apache.eagle</groupId> - <artifactId>eagle-alert-process</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> <artifactId>eagle-stream-process-base</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java deleted file mode 100644 index b17c192..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutor.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate; - -import org.apache.eagle.policy.ResultRender; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.policy.PolicyPartitioner; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.eagle.policy.executor.PolicyProcessExecutor; - -/** - * @since Dec 16, 2015 - * - */ -public class AggregateExecutor extends PolicyProcessExecutor<AggregateDefinitionAPIEntity, AggregateEntity> { - - private static final long serialVersionUID = 1L; - - private ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> render = new AggregateResultRender(); - - public AggregateExecutor(String executorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, - PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) { - super(executorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams, - AggregateDefinitionAPIEntity.class); - } - - @Override - public ResultRender<AggregateDefinitionAPIEntity, AggregateEntity> getResultRender() { - return render; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java deleted file mode 100644 index 5093685..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValue; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity; -import org.apache.eagle.policy.DefaultPolicyPartitioner; -import org.apache.eagle.policy.PolicyPartitioner; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl; -import org.apache.eagle.policy.executor.IPolicyExecutor; -import org.apache.eagle.service.client.EagleServiceConnector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * @since Dec 16, 2015 - * - */ -public class AggregateExecutorFactory { - - private static final Logger LOG = LoggerFactory.getLogger(AggregateExecutorFactory.class); - - private AggregateExecutorFactory() {} - public static final AggregateExecutorFactory Instance = new AggregateExecutorFactory(); - - - public IPolicyExecutor[] createExecutors(List<String> streamNames, String cql) throws Exception { - int numPartitions = 1; //loadExecutorConfig(config, executorId, partitionerCls); - - IPolicyExecutor[] executors = new IPolicyExecutor[numPartitions]; - String[] upStreams = streamNames.toArray(new String[0]); - for (int i = 0; i < numPartitions ; i++ ) { - executors[i] = new SimpleAggregateExecutor(upStreams, cql, "siddhiCEPEngine", i, numPartitions); - } - - return executors; - } - - public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception { - StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName()); - int numPartitions = loadExecutorConfig(config, executorId, partitionerCls); - PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<>( - new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME); - return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString()); - } - - @SuppressWarnings("unchecked") - private int loadExecutorConfig(Config config, String executorId, StringBuilder partitionerCls) { - int numPartitions = 0; - String aggregateExecutorConfigsKey = "aggregateExecutorConfigs"; - if(config.hasPath(aggregateExecutorConfigsKey)) { - Map<String, ConfigValue> analyzeExecutorConfigs = config.getObject(aggregateExecutorConfigsKey); - if(analyzeExecutorConfigs !=null && analyzeExecutorConfigs.containsKey(executorId)) { - Map<String, Object> alertExecutorConfig = (Map<String, Object>) analyzeExecutorConfigs.get(executorId).unwrapped(); - int parts = 0; - if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism")); - numPartitions = parts == 0 ? 1 : parts; - if(alertExecutorConfig.containsKey("partitioner")) { - partitionerCls.setLength(0); - partitionerCls.append((String) alertExecutorConfig.get("partitioner")); - } - } - } - return numPartitions; - } - -// private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception { -// // Get map from alertExecutorId to alert stream -// // (dataSource) => Map[alertExecutorId:String,streamName:List[String]] -// List<String> streamNames = new ArrayList<String>(); -// // FIXME : here we reuse the executor definition. But the name alert is not ambiguous now. -// AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(new EagleServiceConnector(config)); -// List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, -// executorId); -// for (AlertExecutorEntity entity : alertExecutorEntities) { -// streamNames.add(entity.getTags().get(Constants.STREAM_NAME)); -// } -// return streamNames; -// } - - private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO, - List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls) - throws Exception { - LOG.info("Creating aggregator executors with executorID: " + executorID + ", numPartitions: " - + numPartitions + ", Partition class is: " + partitionerCls); - - PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance(); - AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions]; - String[] _sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]); - - for (int i = 0; i < numPartitions; i++) { - alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO, - _sourceStreams); - } - return alertExecutors; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java deleted file mode 100644 index 986885a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateResultRender.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate; - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.ResultRender; - -import java.io.Serializable; -import java.util.List; - -/** - * Created on 12/29/15. - */ -public class AggregateResultRender implements ResultRender<AggregateDefinitionAPIEntity, AggregateEntity>, Serializable { - - - @Override - public AggregateEntity render(Config config, - List<Object> rets, - PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> siddhiAlertContext, - long timestamp) { - AggregateEntity result = new AggregateEntity(); - for (Object o : rets) { - result.add(o); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java deleted file mode 100644 index e0dadbf..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateDefinitionAPIEntity; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.PolicyEvaluator; -import org.apache.eagle.policy.PolicyManager; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.config.AbstractPolicyDefinition; -import org.apache.eagle.policy.executor.IPolicyExecutor; -import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler; -import org.apache.hadoop.hbase.util.MD5Hash; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Only one policy for one simple aggregate executor - * - * Created on 1/10/16. - */ -public class SimpleAggregateExecutor - extends JavaStormStreamExecutor2<String, AggregateEntity> - implements SiddhiEvaluationHandler<AggregateDefinitionAPIEntity, AggregateEntity>, IPolicyExecutor<AggregateDefinitionAPIEntity, AggregateEntity> { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleAggregateExecutor.class); - - private final String cql; - private final int partitionSeq; - private final int totalPartitionNum; - - private final String[] upStreamNames; - private String policyId; - private String executorId; - private Config config; - private AggregateDefinitionAPIEntity aggDef; - private PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator; - - public SimpleAggregateExecutor(String[] upStreams, String cql, String policyType, int partitionSeq, int totalPartitionNum) { - this.cql = cql; - this.partitionSeq = partitionSeq; - this.upStreamNames = upStreams; - this.totalPartitionNum = totalPartitionNum; - // create an fixed definition policy api entity, and indicate it has full definition - aggDef = new AggregateDefinitionAPIEntity(); - aggDef.setTags(new HashMap<String, String>()); - aggDef.getTags().put(Constants.POLICY_TYPE, policyType); - // TODO make it more general, not only hard code siddhi cep support here. - try { - Map<String,Object> template = new HashMap<>(); - template.put("type","siddhiCEPEngine"); - template.put("expression",this.cql); - template.put("containsDefinition",true); - aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(template)); - } catch (Exception e) { - LOG.error("Simple aggregate generate policy definition failed!", e); - } - aggDef.setCreatedTime(new Date().getTime()); - aggDef.setLastModifiedDate(new Date().getTime()); - aggDef.setName("anonymous-aggregation-def"); - aggDef.setOwner("anonymous"); - aggDef.setEnabled(true); - aggDef.setDescription("anonymous aggregation definition"); - - String random = MD5Hash.getMD5AsHex(cql.getBytes()); - policyId = "anonymousAggregatePolicyId-" + random; - executorId= "anonymousAggregateId-" +random; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - evaluator = createPolicyEvaluator(aggDef); - } - - /** - * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class - * - * @return PolicyEvaluator instance - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - protected PolicyEvaluator<AggregateDefinitionAPIEntity> createPolicyEvaluator(AggregateDefinitionAPIEntity alertDef) { - String policyType = alertDef.getTags().get(Constants.POLICY_TYPE); - Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType); - if (evalCls == null) { - String msg = "No policy evaluator defined for policy type : " + policyType; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - AbstractPolicyDefinition policyDef = null; - try { - policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, - PolicyManager.getInstance().getPolicyModules(policyType)); - } catch (Exception ex) { - LOG.error("Fail initial alert policy def: " + alertDef.getPolicyDef(), ex); - } - - PolicyEvaluator<AggregateDefinitionAPIEntity> pe; - PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context = new PolicyEvaluationContext<>(); - context.policyId = alertDef.getTags().get("policyId"); - context.alertExecutor = this; - context.resultRender = new AggregateResultRender(); - try { - // create evaluator instances - pe = (PolicyEvaluator<AggregateDefinitionAPIEntity>) evalCls - .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class) - .newInstance(config, context, policyDef, upStreamNames, false); - } catch (Exception ex) { - LOG.error("Fail creating new policyEvaluator", ex); - LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef()); - throw new IllegalStateException(ex); - } - return pe; - } - - @Override - public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) { - if (input.size() != 3) - throw new IllegalStateException("AggregateExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)"); - if (LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2)); - if (LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + evaluator); - - try { - evaluator.evaluate(new ValuesArray(collector, input.get(1), input.get(2))); - } catch (Exception ex) { - LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex); - } - } - - @Override - public void onEvalEvents(PolicyEvaluationContext<AggregateDefinitionAPIEntity, AggregateEntity> context, List<AggregateEntity> alerts) { - if (alerts != null && !alerts.isEmpty()) { - String policyId = context.policyId; - LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId)); - Collector outputCollector = context.outputCollector; - PolicyEvaluator<AggregateDefinitionAPIEntity> evaluator = context.evaluator; - for (AggregateEntity entity : alerts) { - synchronized (this) { - outputCollector.collect(new Tuple2(policyId, entity)); - } - if (LOG.isDebugEnabled()) { - LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator); - } - } - } - } - - @Override - public String getExecutorId() { - return executorId; - } - - @Override - public int getPartitionSeq() { - return partitionSeq; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java deleted file mode 100644 index 62830ae..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateDefinitionAPIEntity.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate.entity; - -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.log.entity.meta.*; -import org.apache.eagle.policy.common.Constants; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -/** - * entity of stream analyze definition - * - */ -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) -@Table("aggregatedef") -@ColumnFamily("f") -@Prefix("aggregatedef") -@Service(Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME) -@JsonIgnoreProperties(ignoreUnknown = true) -@TimeSeries(false) -@Tags({"site", "dataSource", "executorId", "policyId", "policyType"}) -@Indexes({ - @Index(name="Index_1_aggregateExecutorId", columns = { "executorId" }, unique = true), -}) -@SuppressWarnings("serial") -public class AggregateDefinitionAPIEntity extends AbstractPolicyDefinitionEntity { - - @Column("a") - private String name; - @Column("b") - private String policyDef; - @Column("c") - private String description; - @Column("d") - private boolean enabled; - @Column("e") - private String owner; - @Column("f") - private long lastModifiedDate; - @Column("g") - private long createdTime; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getPolicyDef() { - return policyDef; - } - - public void setPolicyDef(String policyDef) { - this.policyDef = policyDef; - valueChanged("policyDef"); - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - valueChanged("description"); - } - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - valueChanged("enabled"); - } - - public String getOwner() { - return owner; - } - - public void setOwner(String owner) { - this.owner = owner; - valueChanged("owner"); - } - - public long getLastModifiedDate() { - return lastModifiedDate; - } - - public void setLastModifiedDate(long lastModifiedDate) { - this.lastModifiedDate = lastModifiedDate; - valueChanged("lastModifiedDate"); - } - - public long getCreatedTime() { - return createdTime; - } - - public void setCreatedTime(long createdTime) { - this.createdTime = createdTime; - valueChanged("createdTime"); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java deleted file mode 100644 index 64c20b2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntity.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate.entity; - -import java.io.Serializable; -import java.util.LinkedList; -import java.util.List; - -/** - * Event entity during stream processing - * - * @since Dec 17, 2015 - * - */ -public class AggregateEntity implements Serializable { - - private static final long serialVersionUID = 5911351515190098292L; - - private List<Object> data = new LinkedList<>(); - - public void add(Object res) { - data.add(res); - } - - public List<Object> getData() { - return data; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java deleted file mode 100644 index 7c932d4..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/entity/AggregateEntityRepository.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.aggregate.entity; - -import org.apache.eagle.log.entity.repo.EntityRepository; - -/** - * Created on 1/6/16. - */ -public class AggregateEntityRepository extends EntityRepository { - public AggregateEntityRepository() { - entitySet.add(AggregateDefinitionAPIEntity.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java deleted file mode 100644 index 0732639..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/IPersistService.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.persist; - -/** - * Interface by the stream framework to storage - * - * @since Dec 19, 2015 - * - */ -public interface IPersistService<T> { - - boolean save(String stream, T apiEntity) throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java deleted file mode 100644 index 2e1754b..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/PersistExecutor.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.persist; - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.eagle.dataproc.impl.persist.druid.KafkaPersistService; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.datastream.core.StorageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.text.MessageFormat; -import java.util.List; - -/** - * - * TODO: currently only accept to be used after aggregation node (See the AggregateEntity reference here). - * @since Dec 19, 2015 - * - */ -public class PersistExecutor extends JavaStormStreamExecutor2<String, AggregateEntity> { - - private static final Logger LOG = LoggerFactory.getLogger(PersistExecutor.class); - - private Config config; - private IPersistService<AggregateEntity> persistService; - private String persistExecutorId; - private String persistType; - - public PersistExecutor(String persistExecutorId, String persistType) { - this.persistExecutorId = persistExecutorId; - this.persistType = persistType; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - @Override - public void init() { - if (persistType.equalsIgnoreCase(StorageType.KAFKA().toString())) { - Config subConfig = this.config.getConfig("persistExecutorConfigs" + "." + persistExecutorId); - persistService = new KafkaPersistService(subConfig); - } else { - throw new RuntimeException(String.format("Persist type '%s' not supported yet!", persistService)); - } - } - - @Override - public void flatMap(List<Object> input, Collector<Tuple2<String, AggregateEntity>> collector) { - if (input.size() != 2) { - LOG.error(String.format("Persist executor expect two elements per tuple. But actually got size %d lists!", - input.size())); - return; - } - - String policyId = (String) input.get(0); - AggregateEntity entity = (AggregateEntity) input.get(1); - try { - persistService.save("defaultOutput", entity); - } catch (Exception e) { - LOG.error(MessageFormat.format("persist entity failed: {0}", entity), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java deleted file mode 100644 index ea61278..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/AggregateEntitySerializer.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.persist.druid; - -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializationConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Map; - -/** - * TODO: configurable null handling for serialization?? - * Created on 1/4/16. - */ -public class AggregateEntitySerializer implements - Closeable, AutoCloseable, Serializer<AggregateEntity> { - - private final StringSerializer stringSerializer = new StringSerializer(); - private static final Logger logger = LoggerFactory.getLogger(AggregateEntitySerializer.class); - private static final ObjectMapper om = new ObjectMapper(); - - static { - om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true); - } - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - - } - - @Override - public byte[] serialize(String topic, AggregateEntity data) { - String str = null; - try { - str = om.writeValueAsString(data.getData()); - } catch (IOException e) { - logger.error("Kafka serialization for send error!", e); - } - return stringSerializer.serialize(topic, str); - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java deleted file mode 100644 index 919b92e..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/persist/druid/KafkaPersistService.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.persist.druid; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValue; -import org.apache.eagle.dataproc.impl.aggregate.entity.AggregateEntity; -import org.apache.eagle.dataproc.impl.persist.IPersistService; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.*; -import java.util.concurrent.Future; - -/** - * TODO : support more general entity input - * @since Dec 21, 2015 - * - */ -public class KafkaPersistService implements IPersistService<AggregateEntity> { - - private static final String ACKS = "acks"; - private static final String RETRIES = "retries"; - private static final String BATCH_SIZE = "batchSize"; - private static final String LINGER_MS = "lingerMs"; - private static final String BUFFER_MEMORY = "bufferMemory"; - private static final String KEY_SERIALIZER = "keySerializer"; - private static final String VALUE_SERIALIZER = "valueSerializer"; - private static final String BOOTSTRAP_SERVERS = "bootstrap_servers"; - - private KafkaProducer<String, AggregateEntity> producer; - private final Config config; - private final SortedMap<String, String> streamTopicMap; - private final Properties props; - - /** - * <pre> - * props.put("bootstrap.servers", "localhost:4242"); - * props.put("acks", "all"); - * props.put("retries", 0); - * props.put("batch.size", 16384); - * props.put("linger.ms", 1); - * props.put("buffer.memory", 33554432); - * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - * </pre> - */ - public KafkaPersistService(Config config) { - this.config = config; - Config kafkaConfig = config.getConfig("kafka"); - if (kafkaConfig == null) { - throw new IllegalStateException("Druid persiste service failed to find kafka configurations!"); - } - props = new Properties(); - if (kafkaConfig.hasPath(BOOTSTRAP_SERVERS)) { - props.put("bootstrap.servers", kafkaConfig.getString(BOOTSTRAP_SERVERS)); - } - if (kafkaConfig.hasPath(ACKS)) { - props.put(ACKS, kafkaConfig.getString(ACKS)); - } - if (kafkaConfig.hasPath(RETRIES)) { - props.put(RETRIES, kafkaConfig.getInt(RETRIES)); - } - if (kafkaConfig.hasPath(BATCH_SIZE)) { - props.put("batch.size", kafkaConfig.getInt(BATCH_SIZE)); - } - if (kafkaConfig.hasPath(LINGER_MS)) { - props.put("linger.ms", kafkaConfig.getInt(LINGER_MS)); - } - if (kafkaConfig.hasPath(BUFFER_MEMORY)) { - props.put("buffer.memory", kafkaConfig.getLong(BUFFER_MEMORY)); - } - if (kafkaConfig.hasPath(KEY_SERIALIZER)) { - props.put("key.serializer", kafkaConfig.getString(KEY_SERIALIZER)); - } else { - props.put("key.serializer", StringSerializer.class.getCanonicalName()); - } -// if (kafkaConfig.hasPath(VALUE_SERIALIZER)) { -// props.put("value.serializer", kafkaConfig.getString(VALUE_SERIALIZER)); -// } - props.put("value.serializer", AggregateEntitySerializer.class.getCanonicalName()); - - streamTopicMap = new TreeMap<>(); - if (kafkaConfig.hasPath("topics")) { - Config topicConfig = kafkaConfig.getConfig("topics"); - Set<Map.Entry<String, ConfigValue>> topics = topicConfig.entrySet(); - for (Map.Entry<String, ConfigValue> t : topics) { - streamTopicMap.put(t.getKey(), (String) t.getValue().unwrapped()); - } - } - - producer = new KafkaProducer<>(props); - } - - @Override - public boolean save(String stream, AggregateEntity apiEntity) throws Exception { - if (streamTopicMap.get(stream) != null) { - ProducerRecord<String, AggregateEntity> record = new ProducerRecord<>(streamTopicMap.get(stream), apiEntity); - Future<RecordMetadata> future = producer.send(record); - // TODO : more for check the sending status - return true; - } - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java index 2323f39..e9ba3ca 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/HDFSSourcedStormSpoutProvider.java @@ -43,9 +43,6 @@ public class HDFSSourcedStormSpoutProvider implements StormSpoutProvider { if(conf.equalsIgnoreCase("data collection")){ return new DataCollectionHDFSSpout(configContext); } - if(conf.equalsIgnoreCase("user profile generation")){ - return new UserProfileGenerationHDFSSpout(configContext); - } return null; } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java deleted file mode 100644 index e07ee81..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/hdfs/UserProfileGenerationHDFSSpout.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.dataproc.impl.storm.hdfs; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; -import java.io.Serializable; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.core.StreamingProcessConstants; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import com.esotericsoftware.minlog.Log; - -public class UserProfileGenerationHDFSSpout extends HDFSSourcedStormSpoutProvider.HDFSSpout { - - private static final long serialVersionUID = 2274234104008894386L; - private Config configContext; - private TopologyContext _context; - SpoutOutputCollector _collector; - - public class UserProfileData implements Serializable{ - private static final long serialVersionUID = -3315860110144736840L; - private String user; - private List<String> dateTime = new ArrayList<String>(); - private List<Integer> hrInDay = new ArrayList<Integer>(); - private List<String> line = new ArrayList<String>(); - - public String getUser() { - return user; - } - public void setUser(String user) { - this.user = user; - } - public String getDateTime(int index) { - return dateTime.get(index); - } - public List<String> getDateTimes() { - return dateTime; - } - public void setDateTime(String dateTime) { - this.dateTime.add(dateTime); - } - public int getHrInDay(int index) { - return hrInDay.get(index); - } - public List<Integer> getHrsInDay() { - return hrInDay; - } - public void setHrInDay(int hrInDay) { - this.hrInDay.add(hrInDay); - } - public String getLine(int index) { - return line.get(index); - } - public List<String> getLines() { - return line; - } - public void setLine(String line) { - this.line.add(line); - } - - } - - private static final Logger LOG = LoggerFactory.getLogger(UserProfileGenerationHDFSSpout.class); - - public UserProfileGenerationHDFSSpout(Config configContext){ - this.configContext = configContext; - LOG.info("UserProfileGenerationHDFSSpout called"); - } - - public void copyFiles(){ - LOG.info("Inside listFiles()"); - //Configuration conf = new Configuration(); - JobConf conf = new JobConf(); - // _____________ TO TEST THAT CORRECT HADOOP JARs ARE INCLUDED __________________ - ClassLoader cl = ClassLoader.getSystemClassLoader(); - URL[] urls = ((URLClassLoader)cl).getURLs(); - if(LOG.isDebugEnabled()) { - for (URL url : urls) { - LOG.debug(url.getFile()); - } - } - // _________________________________________ - String hdfsConnectionStr = configContext.getString("dataSourceConfig.hdfsConnection"); - LOG.info("HDFS connection string: " + hdfsConnectionStr); - - String hdfsPath = configContext.getString("dataSourceConfig.hdfsPath"); - LOG.info("HDFS path: " + hdfsPath); - - String copyToPath = configContext.getString("dataSourceConfig.copyToPath"); - LOG.info("copyToPath: " + copyToPath); - String srcPathStr = new String("hdfs://" + hdfsConnectionStr + hdfsPath); - Path srcPath = new Path(srcPathStr); - LOG.info("listFiles called"); - LOG.info("srcPath: " + srcPath); - try { - FileSystem fs = srcPath.getFileSystem(conf); - /*CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); - CompressionCodec codec = codecFactory.getCodec(srcPath); - DataInputStream inputStream = new DataInputStream(codec.createInputStream(fs.open(srcPath))); - */ - - Path destPath = new Path(copyToPath); - LOG.info("Destination path: " + destPath); - String userListFileName = configContext.getString("dataSourceConfig.userList"); - //loggerHDFSSpout.info("userListFileName: " + userListFileName); - List<String> userList = getUser(userListFileName); - for(String user:userList){ - Path finalSrcPath = new Path(srcPath.getName() + "/" + user); - fs.copyToLocalFile(finalSrcPath, destPath); - } - LOG.info("Copy to local succeed"); - fs.close(); - - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - } - - private List<String> getAllFiles(String root, int level){ - - List<String> lists = new ArrayList<String>(); - File rootFile = new File(root); - File[] tempList = rootFile.listFiles(); - if(tempList == null) - return lists; - - for(File temp:tempList){ - if(temp.isDirectory()) - lists.addAll(getAllFiles(temp.getAbsolutePath(), ++level)); - else{ - if(temp.getName().endsWith(".csv")) - lists.add(temp.getAbsolutePath()); - } - } - return lists; - - } - - public List<String> listFiles(String path){ - - LOG.info("Reading from: " + path); - List<String> files = new ArrayList<String>(); - files = getAllFiles(path, 0); - return files; - } - - private List<String> getUser(String listFileName){ - List<String> userList = new ArrayList<String>(); - BufferedReader reader = null; - try{ - InputStream is = getClass().getResourceAsStream(listFileName); - reader = new BufferedReader(new InputStreamReader(is)); - String line = ""; - while((line = reader.readLine()) != null){ - userList.add(line); - LOG.info("User added:" + line); - } - }catch(Exception e){ - e.printStackTrace(); - }finally{ - try { - if(reader != null) - reader.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - return userList; - } - - @Override - public void nextTuple() { - LOG.info("Releasing nextTuple"); - - String userListFileName = configContext.getString("dataSourceConfig.userList"); - - //loggerHDFSSpout.info("userListFileName: " + userListFileName); - List<String> userList = getUser(userListFileName); - //loggerHDFSSpout.info("user list size:" + userList.size()); - for(String user: userList){ - LOG.info("Processing user: " + user); - String copyToPath = configContext.getString("dataSourceConfig.copyToPath"); - //loggerHDFSSpout.info("copyToPath: " + copyToPath); - - copyToPath +="/" + user; - List<String> files = listFiles(copyToPath); - LOG.info("Files returned: " + files.size()); - String typeOfFile = configContext.getString("dataSourceConfig.fileFormat"); - //loggerHDFSSpout.info("typeOfFile returned: " + typeOfFile); - UserProfileData usersProfileDataset = new UserProfileData(); - - for(String fileName:files){ - LOG.info("FileName: " + fileName); - usersProfileDataset.setDateTime(fileName.substring(fileName.lastIndexOf("/")+1, fileName.lastIndexOf("."))); - BufferedReader br = null; - Reader decoder = null; - InputStream inStream = null; - - try{ - inStream = new FileInputStream(new File(fileName)); - decoder = new InputStreamReader(inStream); - br = new BufferedReader(decoder); - int lineNo = 0; - String line = ""; - while((line = br.readLine())!= null){ - boolean containsFileHeader = configContext.getBoolean("dataSourceConfig.containsFileHeader"); - //loggerHDFSSpout.info("containsFileHeader returned: " + containsFileHeader); - if(containsFileHeader == true && lineNo == 0){ - // ignore the header column - lineNo++; - continue; - } - //loggerHDFSSpout.info("emitting line from file: " + fileName); - - usersProfileDataset.setLine(line); - usersProfileDataset.setHrInDay(lineNo); - lineNo++; - } - } - catch (Exception e) { - Log.error("File operation failed"); - throw new IllegalStateException(); - }finally{ - try { - if(br != null) - br.close(); - if(decoder != null) - decoder.close(); - if(inStream != null) - inStream.close(); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - } - usersProfileDataset.setUser(user); - _collector.emit(new ValuesArray(user, "HDFSSourcedStormExecutor", usersProfileDataset)); - LOG.info("Emitting data of length: " + usersProfileDataset.getLines().size()); - Utils.sleep(1000); - } - this.close(); - } - - @Override - public void open(Map arg0, TopologyContext context, - SpoutOutputCollector collector) { - _collector = collector; - _context = context; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - // TODO Auto-generated method stub - declarer.declare(new Fields(StreamingProcessConstants.EVENT_PARTITION_KEY, StreamingProcessConstants.EVENT_STREAM_NAME, StreamingProcessConstants.EVENT_ATTRIBUTE_MAP)); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java deleted file mode 100644 index 993a4a2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaMapperStormExecutor.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import org.apache.eagle.datastream.utils.NameConstants; - -public class JavaMapperStormExecutor extends BaseRichBolt{ - public static class e1 extends JavaMapperStormExecutor { - public e1(JavaMapper mapper){ - super(1, mapper); - } - } - public static class e2 extends JavaMapperStormExecutor { - public e2(JavaMapper mapper){ - super(2, mapper); - } - } - public static class e3 extends JavaMapperStormExecutor { - public e3(JavaMapper mapper){ - super(3, mapper); - } - } - public static class e4 extends JavaMapperStormExecutor { - public e4(JavaMapper mapper){ - super(4, mapper); - } - } - - private JavaMapper mapper; - private OutputCollector collector; - private int numOutputFields; - public JavaMapperStormExecutor(int numOutputFields, JavaMapper mapper){ - this.numOutputFields = numOutputFields; - this.mapper = mapper; - } - - @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - this.collector = collector; - } - - @Override - public void execute(Tuple input) { - List<Object> ret = mapper.map(input.getValues()); - this.collector.emit(ret); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - List<String> fields = new ArrayList<String>(); - for(int i=0; i<numOutputFields; i++){ - fields.add(NameConstants.FIELD_PREFIX() + i); - } - declarer.declare(new Fields(fields)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java deleted file mode 100644 index a485d76..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/JavaStormExecutorForAlertWrapper.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream; - -import java.util.List; -import java.util.SortedMap; - -import com.typesafe.config.Config; -import scala.Tuple2; -import scala.Tuple3; - -public class JavaStormExecutorForAlertWrapper extends JavaStormStreamExecutor3<String, String, SortedMap<Object, Object>>{ - private JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate; - private String streamName; - public JavaStormExecutorForAlertWrapper(JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> delegate, String streamName){ - this.delegate = delegate; - this.streamName = streamName; - } - @Override - public void prepareConfig(Config config) { - delegate.prepareConfig(config); - } - - @Override - public void init() { - delegate.init(); - } - - @Override - public void flatMap(List<Object> input, final Collector<Tuple3<String, String, SortedMap<Object, Object>>> collector) { - Collector delegateCollector = new Collector(){ - @Override - public void collect(Object o) { - Tuple2 tuple2 = (Tuple2)o; - collector.collect(new Tuple3(tuple2._1, streamName, tuple2._2)); - } - }; - delegate.flatMap(input, delegateCollector); - } - - public JavaStormStreamExecutor<Tuple2<String, SortedMap<Object, Object>>> getDelegate() { - return delegate; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties index 80e0aba..d59ded6 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/log4j.properties @@ -13,28 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootLogger=info, stdout, DRFA +log4j.rootLogger=INFO, stdout -eagle.log.dir=./logs -eagle.log.file=eagle.log - - -#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinBolt=DEBUG -#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinBolt=DEBUG -log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG -#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG # standard output log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n - -# Daily Rolling File Appender -log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} -log4j.appender.DRFA.DatePattern=yyyy-MM-dd -## 30-day backup -# log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala index 90e59cf..c3ddb00 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala @@ -36,37 +36,6 @@ import scala.reflect.runtime.universe._ object ExecutionEnvironments{ type storm = StormExecutionEnvironment - /** - * Use `'''get[StormExecutionEnvironment](config)'''` instead - * - * @param config - * @return - */ - @deprecated("Execution environment should not know implementation of Storm") - def getStorm(config : Config) = new StormExecutionEnvironment(config) - - /** - * Use `'''get[StormExecutionEnvironment]'''` instead - * - * @return - */ - @deprecated("Execution environment should not know implementation of Storm") - def getStorm:StormExecutionEnvironment = { - val config = ConfigFactory.load() - getStorm(config) - } - - /** - * Use `'''get[StormExecutionEnvironment](args)'''` instead - * - * @see get[StormExecutionEnvironment](args) - * @param args - * @return - */ - @deprecated("Execution environment should not know implementation of Storm") - def getStorm(args:Array[String]):StormExecutionEnvironment = { - getStorm(new ConfigOptionParser().load(args)) - } /** * @param typeTag http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala deleted file mode 100644 index 46f4738..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyCompiler.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -trait AbstractTopologyCompiler{ - def buildTopology : AbstractTopologyExecutor -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala deleted file mode 100644 index 1e1664a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/AbstractTopologyExecutor.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -trait AbstractTopologyExecutor { - def execute -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala deleted file mode 100644 index e3f3050..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.{Config, _} - -import scala.reflect.runtime.universe._ - -/** - * @since 12/4/15 - */ -case class Configuration(private var config:Config) extends Serializable{ - def get:Config = config - - def set[T<:AnyRef](key:String,value:T): Unit = { - config = config.withValue(key,ConfigValueFactory.fromAnyRef(value)) - } - - /** - * - * @param key config key - * @param default default value - * @tparam T return type - * @return - */ - def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = { - if(get.hasPath(key)) { - get(key) - } else default - } - - def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match { - case STRING_TYPE => config.getString(key).asInstanceOf[T] - case TypeTag.Double => get.getDouble(key).asInstanceOf[T] - case TypeTag.Long => get.getLong(key).asInstanceOf[T] - case TypeTag.Int => get.getInt(key).asInstanceOf[T] - case TypeTag.Byte => get.getBytes(key).asInstanceOf[T] - case TypeTag.Boolean => get.getBoolean(key).asInstanceOf[T] - case NUMBER_TYPE => get.getNumber(key).asInstanceOf[T] - case OBJECT_TYPE => get.getObject(key).asInstanceOf[T] - case VALUE_TYPE => get.getValue(key).asInstanceOf[T] - case ANY_REF_TYPE => get.getAnyRef(key).asInstanceOf[T] - case INT_LIST_TYPE => get.getIntList(key).asInstanceOf[T] - case DOUBLE_LIST_TYPE => get.getDoubleList(key).asInstanceOf[T] - case BOOL_LIST_TYPE => get.getBooleanList(key).asInstanceOf[T] - case LONG_LIST_TYPE => get.getLongList(key).asInstanceOf[T] - case _ => throw new UnsupportedOperationException(s"$tag is not supported yet") - } - - val STRING_TYPE = typeOf[String] - val NUMBER_TYPE = typeOf[Number] - val INT_LIST_TYPE = typeOf[List[Int]] - val BOOL_LIST_TYPE = typeOf[List[Boolean]] - val DOUBLE_LIST_TYPE = typeOf[List[Double]] - val LONG_LIST_TYPE = typeOf[List[Double]] - val OBJECT_TYPE = typeOf[ConfigObject] - val VALUE_TYPE = typeOf[ConfigValue] - val ANY_REF_TYPE = typeOf[AnyRef] -} \ No newline at end of file
