http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf deleted file mode 100644 index 4c21a7c..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf +++ /dev/null @@ -1,42 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -### scheduler propertise -appCommandLoaderIntervalSecs = 1 -appHealthCheckIntervalSecs = 5 - -### execution platform properties -envContextConfig.env = "storm" -envContextConfig.url = "http://sandbox.hortonworks.com:8744" -envContextConfig.nimbusHost = "sandbox.hortonworks.com" -envContextConfig.nimbusThriftPort = 6627 -envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar" - -### default topology properties -eagleProps.mailHost = "mailHost.com" -eagleProps.mailSmtpPort = "25" -eagleProps.mailDebug = "true" -eagleProps.eagleService.host = "localhost" -eagleProps.eagleService.port = 9099 -eagleProps.eagleService.username = "admin" -eagleProps.eagleService.password = "secret" -eagleProps.dataJoinPollIntervalSec = 30 - -dynamicConfigSource.enabled = true -dynamicConfigSource.initDelayMillis = 0 -dynamicConfigSource.delayMillis = 30000 - -
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties deleted file mode 100644 index 25331ab..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties +++ /dev/null @@ -1,35 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout - - eagle.log.dir=../logs - eagle.log.file=eagle.log - -# standard output -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n - -# Daily Rolling File Appender - log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender - log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file} - log4j.appender.DRFA.DatePattern=.yyyy-MM-dd -## 30-day backup -# log4j.appender.DRFA.MaxBackupIndex=30 - log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala deleted file mode 100644 index e87ee92..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.stream.application.scheduler - -import com.typesafe.config.Config -import org.apache.eagle.stream.application.TopologyExecutable -import org.slf4j.LoggerFactory - -class MockTopology extends TopologyExecutable { - private val LOG = LoggerFactory.getLogger(classOf[MockTopology]) - override def submit(topology: String, config: Config): Unit = { - LOG.info(s"$topology is running") - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala deleted file mode 100644 index 1cad3a7..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala +++ /dev/null @@ -1,40 +0,0 @@ -package org.apache.eagle.stream.application.scheduler - -import com.typesafe.config.ConfigFactory -import org.apache.eagle.common.config.EagleConfigConstants -import org.apache.eagle.stream.application.ExecutionPlatform -import org.apache.eagle.stream.application.impl.StormExecutionPlatform - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -object StormApplicationManagerSpec extends App { - val manager: ExecutionPlatform = new StormExecutionPlatform - val baseConfig = ConfigFactory.load() - val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n \"envContextConfig\" : {\n \"env\" : \"storm\",\n \"mode\" : \"cluster\",\n \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n \"parallelismConfig\" : {\n \"kafkaMsgConsumer\" : 1,\n \"hbaseSecurityLogAlertExecutor*\" : 1\n }\n },\n \"dataSourceConfig\": {\n \"topic\" : \"sandbox_hbase_security_log\",\n \"zkConnection\" : \"127.0.0.1:2181\",\n \"zkConnectionTimeoutMS\" : 15000,\n \"brokerZkPath\" : \"/brokers\",\n \"fetchSize\" : 1048586,\n \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n \"transactionZKServers\" : \"127.0.0.1\",\n \"transactionZKPort\" : 2181,\n \"transactionZKRoot\" : \"/consumers\",\n \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n \"transactionStateUpdateMS\" : 2000\n },\n \"alertExecutorConfigs\" : {\n \"hbaseSecurityLogAlertExecutor\" : {\n \"parallelism\" : 1,\n \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n \"needValidation\" : \"true\"\n }\n },\n \"eagleProps\" : {\n \"site\" : \"sandbox\",\n \"application\": \"hbaseSecurityLog\",\n \"dataJoinPollIntervalSec\" : 30,\n \"mailHost\" : \"mailHost.com\",\n \"mailSmtpPort\":\"25\",\n \"mailDebug\" : \"true\",\n \"eagleService\": {\n \"host\": \"localhost\",\n \"port\": 9099\n \"username\": \"admin\",\n \"password\": \"secret\"\n }\n },\n \"dynamicConfigSource\" : {\n \"enabled\" : true,\n \"initDelayMillis\" : 0,\n \"delayMillis\" : 30000\n }\n}" - - val topoConfig = ConfigFactory.parseString(topoConfigStr) - val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig) - - //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf) - //println(s"Result: ret=$ret, nextState=$nextState") -} - - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala deleted file mode 100644 index 3db2d67..0000000 --- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.stream.application.scheduler - -import akka.actor.{ActorSystem, Props} -import akka.testkit.{TestActorRef, TestKit} -import com.typesafe.config.ConfigFactory -import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike} - -@Ignore -class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler")) -with WordSpecLike with MustMatchers with BeforeAndAfterAll { - - "A Scheduler actor" must { - "Forward a message it receives" in { - val coordinator = TestActorRef[StreamAppCoordinator] - coordinator ! CommandLoaderEvent - expectNoMsg() - } - } - - "A Integrated test" must { - "run end-to-end" in { - val coordinator = system.actorOf(Props[StreamAppCoordinator]) - coordinator ! CommandLoaderEvent - expectNoMsg() - } - } - - override def afterAll(): Unit = { - super.afterAll() - system.shutdown() - } -} - -@Ignore -object TestStreamAppScheduler extends App { - val conf: String = """ - akka.loglevel = "DEBUG" - akka.actor.debug { - receive = on - lifecycle = on - } - """ - new ApplicationScheduler().start(ConfigFactory.parseString(conf)) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml index 6f3069c..5637b01 100644 --- a/eagle-core/eagle-app/pom.xml +++ b/eagle-core/eagle-app/pom.xml @@ -32,7 +32,6 @@ <modules> <module>eagle-app-base</module> - <module>eagle-stream-application-manager</module> <module>eagle-application-service</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md deleted file mode 100644 index b8a0bdc..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md +++ /dev/null @@ -1,80 +0,0 @@ -<!-- -{% comment %} -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to you under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -{% endcomment %} ---> - -Eagle Declarative Streaming DSL -=============================== - -DSL Format ----------- - - { - config { - config.key = configValue - } - - schema { - metricStreamSchema { - metric: string - value: double - timestamp: long - } - } - - dataflow { - kafkaSource.source1 { - schema = "metricStreamSchema" - } - kafkaSource.source2 { - schema = { - metric: string - value: double - timestamp: long - } - } - } - } - -Usage ------ - - val pipeline = Pipeline.parseResource("pipeline.conf") - val stream = Pipeline.compile(pipeline) - stream.submit[storm] - -Features --------- -* [x] Compile DSL Configure to Pipeline model -* [x] Compile Pipeline model to Stream Execution Graph -* [x] Submit Stream Execution Graph to actual running environment say storm -* [x] Support Alert and Persistence for metric monitoring -* [ ] Extensible stream module management and automatically scan and register module -* [x] Pipeline runner CLI tool and shell script -* [ ] Decouple pipeline compiler and scheduler into individual modules -* [ ] Stream Pipeline Scheduler -* [ ] Graph editor to define streaming graph in UI -* [?] JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling) -* [?] Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message] -* [ ] Provide stream schema inline and send to metadata when submitting -* [ ] UI should support specify executorId when defining new stream -* [ ] Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end -* [!] Fix configuration conflict, should pass through Config instead of ConfigFactory.load() manually -* [ ] Override application configuration with pipeline configuration -* [ ] Refactor schema registration structure and automatically submit stream schema when submitting pipeline -* [ ] Submit alertStream, alertExecutorId mapping to AlertExecutorService when submitting pipeline -* [x] Supports `inputs` field to define connector \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml deleted file mode 100644 index 18fb610..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml +++ /dev/null @@ -1,156 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>eagle-data-process-parent</artifactId> - <groupId>org.apache.eagle</groupId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>eagle-stream-pipeline</artifactId> - <dependencies> - <!--<dependency>--> - <!--<groupId>org.reflections</groupId>--> - <!--<artifactId>reflections</artifactId>--> - <!--</dependency>--> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-service-base</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.ow2.asm</groupId> - <artifactId>asm-all</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm-all</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm-commons</artifactId> - </exclusion> - <exclusion> - <groupId>asm</groupId> - <artifactId>asm-tree</artifactId> - </exclusion> - <exclusion> - <artifactId>servlet-api</artifactId> - <groupId>javax.servlet</groupId> - </exclusion> - <exclusion> - <artifactId>slf4j-log4j12</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-storage-base</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> -<!-- <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <scope>test</scope> - </dependency> ---> - <!--<dependency>--> - <!--<groupId>org.scala-lang</groupId>--> - <!--<artifactId>scala-reflect</artifactId>--> - <!--</dependency>--> - <!--<dependency>--> - <!--<groupId>org.scala-lang</groupId>--> - <!--<artifactId>scala-compiler</artifactId>--> - <!--<version>${scala.version}.0</version>--> - <!--</dependency>--> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.version}</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-stream-process-api</artifactId> - <version>${project.version}</version> - </dependency> - <!--<dependency>--> - <!--<groupId>com.typesafe.akka</groupId>--> - <!--<artifactId>akka-testkit_${scala.version}</artifactId>--> - <!--<version>${akka.actor.version}</version>--> - <!--<scope>test</scope>--> - <!--</dependency>--> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <executions> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>add-source</goal> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skipTests>true</skipTests> - </configuration> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>TestSuite.txt</filereports> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala deleted file mode 100644 index 65ab390..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline - - -import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler -import org.apache.eagle.stream.pipeline.parser.PipelineParser -import org.apache.eagle.stream.pipeline.runner.PipelineRunner - -object Pipeline - extends PipelineRunner - with PipelineParser - with PipelineCompiler \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala deleted file mode 100644 index 2ff81d4..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.annotation - -import scala.annotation.StaticAnnotation - -case class Extension(extType:String) extends StaticAnnotation \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala deleted file mode 100644 index df8bbe5..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.compiler - - -import org.apache.eagle.datastream.core._ -import org.apache.eagle.stream.pipeline.extension.ModuleManager._ -import org.apache.eagle.stream.pipeline.parser._ -import org.apache.eagle.stream.pipeline.utils.CompileException - -trait PipelineCompiler { - def compile(pipeline:Pipeline):StreamContext = { - val context = new StreamContext(pipeline.config) - val dataflow = pipeline.dataflow - val dag = new StreamDAG(context.dag) - dataflow.getProcessors.map(buildStreamProducer(dag,_)).foreach(producer =>{ - producer.initWith(dag.graph,pipeline.config) - dag.addVertex(producer) - }) - dataflow.getConnectors.foreach(connector =>{ - val from = dag.getNodeByName(connector.from).get - val to = dag.getNodeByName(connector.to).get - dag.addEdge(from,to,buildStreamConnector(from,to,dataflow,connector)) - }) - context - } - private def buildStreamProducer(dag:StreamDAG,processor:Processor):StreamProducer[Any] = { - if(findModuleType(processor.getType)){ - getModuleMapperByType(processor.getType).map(processor).nameAs(processor.getId).stream(processor.streamId) - } else { - throw new CompileException(s"Unknown processor type [${processor.getType}]") - } - } - private def buildStreamConnector(from:StreamProducer[Any],to:StreamProducer[Any],dataflow:DataFlow,connector:Connector):StreamConnector[Any,Any]={ - var groupByIndexes:Seq[Int] = connector.groupByIndexes.orNull - if(groupByIndexes!=null ){ - if(connector.groupByFields.isDefined) throw new CompileException(s"Both ${Connector.GROUP_BY_FIELD_FIELD} and ${Connector.GROUP_BY_INDEX_FIELD} is defined at same time") - } else if(connector.groupByFields.isDefined){ - groupByIndexes = connector.groupByFields.get.map(dataflow.getProcessor(from.name).get.getSchema.get.indexOfAttribute) - } - if(groupByIndexes == null){ - ShuffleConnector(from,to) - } else { - GroupbyFieldsConnector(from,to,groupByIndexes) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala deleted file mode 100644 index 2174560..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala +++ /dev/null @@ -1,186 +0,0 @@ -package org.apache.eagle.stream.pipeline.extension - -import java.util.Properties -import java.util.concurrent.atomic.AtomicBoolean - -import com.typesafe.config.ConfigFactory -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider -import org.apache.eagle.datastream.core._ -import org.apache.eagle.partition.PartitionStrategy -import org.apache.eagle.stream.pipeline.parser.Processor -import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} -//import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -object ModuleManager{ - def getModuleMapperByType(moduleType:String):ModuleMapper = { - classOfProcessorMapping(moduleType) - } - - def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType) - - val classOfProcessorMapping = Map[String,ModuleMapper]( - "KafkaSource" -> KafkaSourceStreamProducer, - "KafkaSink" -> KafkaSinkStreamProducer, - "Alert" -> AlertStreamProducer, - "Persistence" -> PersistProducer, - "Aggregator" -> AggregatorProducer, - "Console" -> ConsoleStreamProducer - ) -} - -trait ModuleMapper{ - def getType:String - def map(module:Processor):StreamProducer[Any] -} -object KafkaSourceStreamProducer extends ModuleMapper{ - def getType = "KafkaSource" - override def map(module:Processor): StreamProducer[Any] = { - val config = module.getConfig - new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava))) - } -} -object KafkaSinkStreamProducer extends ModuleMapper{ - def getType = "KafkaSink" - override def map(module:Processor): StreamProducer[Any] = { - val config = module.getConfig - ForeachProducer[AnyRef](KafkaSinkExecutor(config)) - } -} -object ConsoleStreamProducer extends ModuleMapper{ - override def getType: String = "Stdout" - override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n")) -} -object AlertStreamProducer extends ModuleMapper{ - def getType:String = "Alert" - override def map(module:Processor): StreamProducer[Any] = { - val config = module.getConfig - val moduleId = module.getId - // Support create functional AlertStreamProducer constructor - new AlertStreamProducer ( - upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]], - alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String], - consume = config.getOrElse("consume",true).asInstanceOf[Boolean], - strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null} - ) - } -} - -object PersistProducer extends ModuleMapper{ - override def getType = "Persistence" - override def map(module:Processor): StreamProducer[Any] = { - val config = module.getConfig - new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String])) - } -} - -object AggregatorProducer extends ModuleMapper{ - override def getType: String = "Aggregator" - override def map(module:Processor): StreamProducer[Any] = { - val config = module.getConfig - new AggregateProducer( - upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]], - config.getOrElse("analyzer",module.getId).asInstanceOf[String], - config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null }, - config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null} - ) - } -} - -object KafkaSinkExecutor{ -// val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor]) -} - -/** - * @todo currently support single topic now, should support topic selector - * @param config - */ -case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{ - - val TOPIC_KEY = "topic" - def getDefaultProps = { - val props = new Properties() - props.putAll(Map[String,AnyRef]( - "bootstrap.servers" -> "localhost:6667", - "acks" -> "all", - "retries" -> "3", - "batch.size" -> "16384", - "linger.ms" -> "1", - "buffer.memory" -> "33554432", - "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer", - "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName - ).asJava) - props - } - - @transient var initialized:AtomicBoolean = new AtomicBoolean(false) - @transient var producer:KafkaProducer[String,AnyRef] = null - @transient var topic:String = null - @transient var timeoutMs:Long = 3000 - - private def init():Unit = { - if(this.initialized != null && this.initialized.get()){ -// LOG.info("Already initialized, skip") - return - } - this.initialized = new AtomicBoolean(false) - if (producer != null) { -// LOG.info(s"Closing $producer") - producer.close() - } -// LOG.info("Initializing and creating Kafka Producer") - if (config.contains(TOPIC_KEY)) { - this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String] - } else { - throw new IllegalStateException("topic is not defined") - } - val props = getDefaultProps - props.putAll((config - TOPIC_KEY).asJava) - producer = new KafkaProducer[String, AnyRef](props) -// LOG.info(s"Created new KafkaProducer: $producer") - initialized.set(true) - } - - override def apply(value: AnyRef): Unit = { - if(initialized == null || !initialized.get()) init() - if(topic == null) throw new IllegalStateException("topic is not defined") - val isList = value.isInstanceOf[java.util.List[AnyRef]] - val record: ProducerRecord[String, AnyRef] = if(isList){ - val list = value.asInstanceOf[java.util.List[AnyRef]] - if(list.size() == 1) { - new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0)) - }else{ - new ProducerRecord[String, AnyRef](topic, value) - } - }else{ - new ProducerRecord[String, AnyRef](topic,value) - } - producer.send(record,new Callback(){ - override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { - if(exception!=null){ -// LOG.error(s"Failed to send record $value to topic: $topic",exception) - throw new IllegalStateException(s"Failed to send record $value to topic: $topic",exception) - } - } - }) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala deleted file mode 100644 index 7e1f4cf..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.parser - -import com.typesafe.config.Config -import org.apache.eagle.stream.pipeline.utils.ParseException - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - -import scala.collection.mutable - - -class DataFlow { - def getInputs(id: String):Seq[Processor] = { - this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get) - } - - /** - * Connect if not, do nothing if already connected - * - * @param from - * @param to - */ - def connect(from: String, to: String): Unit = { - val connector = Connector(from,to,null) - var exists = false - connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists) - if(!exists) addConnector(connector) - } - - private var processors = mutable.Map[String,Processor]() - private var connectors = mutable.Seq[Connector]() - def setProcessors(processors:Seq[Processor]):Unit = { - processors.foreach{module => - this.processors.put(module.getId,module) - } - } - def setProcessors(processors:mutable.Map[String,Processor]):Unit = { - this.processors = processors - } - def setConnectors(connectors:Seq[Connector]):Unit = { - connectors.foreach(connector =>{ - this.connectors :+= connector - }) - } - def addProcessor(module:Processor):Unit = { - if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}") - processors.put(module.getId,module) - } - - def contains(module:Processor):Boolean = processors.contains(module.getId) - def addConnector(connector:Connector):Unit = { - connectors :+= connector - } - def getProcessors:Seq[Processor] = processors.values.toSeq - def getProcessor(processorId:String):Option[Processor] = processors.get(processorId) - def getConnectors:Seq[Connector] = connectors -} - -/** - * Stream Processor - * - * @param processorId - * @param processorType - * @param schema - * @param processorConfig - */ -case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable { - private[pipeline] var inputs:Seq[Processor] = null - private[pipeline] var inputIds:Seq[String] = null - - def getId:String = processorId - def getType:String = processorType - def getConfig:Map[String,AnyRef] = processorConfig - def getSchema:Option[Schema] = if(schema == null) None else Some(schema) - - /** - * @todo assume processorId as streamId - * @return - */ - def streamId = processorId -} - -case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{ - import Connector._ - - def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]] - def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match { - case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq) - case None => None - } - def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match { - case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_))) - case None => None - } -} - -object Connector{ - val GROUP_FIELD = "grouping" - val GROUP_BY_FIELD_FIELD = "groupByField" - val GROUP_BY_INDEX_FIELD = "groupByIndex" -} - -private [pipeline] -object Processor { - val SCHEMA_FIELD:String = "schema" - val INPUTS_FIELD = "inputs" - def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = { - val schema = context.get(SCHEMA_FIELD) match { - case Some(schemaDef) => schemaDef match { - case schemaId:String => schemaSet.get(schemaId).getOrElse { - throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context") - } - case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap) - case _ => throw new ParseException(s"Illegal value for schema: $schemaDef") - } - case None => null - } - val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD) - if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq - instance - } -} - - -trait DataFlowParser { - def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = { - val dataw = new DataFlow() - val map = config.root().unwrapped().toMap - - // Parse processors and connectors - map.foreach(entry => { - parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet) - }) - expand(dataw) - validate(dataw) - dataw - } - - private def expand(datafw: DataFlow):Unit = { - datafw.getProcessors.foreach(proc =>{ - if(proc.inputIds!=null) { - proc.inputIds.foreach(id => { - // connect if not - datafw.connect(id,proc.getId) - }) - } - proc.inputs = datafw.getInputs(proc.getId) - proc.inputIds = proc.inputs.map(_.getId) - }) - } - - private def - validate(pipeline:DataFlow): Unit ={ - def checkModuleExists(id:String): Unit ={ - pipeline.getProcessor(id).orElse { - throw new ParseException(s"Stream [$id] is not defined before being referred") - } - } - - pipeline.getConnectors.foreach {connector => - checkModuleExists(connector.from) - checkModuleExists(connector.to) - } - } - - private def - parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = { - Identifier.parse(identifier) match { - case DefinitionIdentifier(processorType) => { - config foreach {entry => - dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet)) - } - } - case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId => - if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId") - dataflow.addConnector(Connector(fromId,toId,config)) - } - case _ => ??? - } - } -} - - -private[pipeline] trait Identifier - -private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier -private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier - -private[pipeline] object Identifier { - val ConnectorFlag = "->" - val UnitFlagSplitPattern = "\\|" - val UnitFlagChar = "|" - val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r - def parse(identifier: String): Identifier = { - // ${id} -> ${id} - ConnectorPattern.findFirstMatchIn(identifier) match { - case Some(matcher) => { - if(matcher.groupCount != 2){ - throw new ParseException(s"Illegal connector definition: $identifier") - }else{ - val source = matcher.group(1) - val destination = matcher.group(2) - if(source.contains(UnitFlagChar)) { - val sources = source.split(UnitFlagSplitPattern).toSeq - ConnectionIdentifier(sources.map{_.trim()},destination) - }else{ - ConnectionIdentifier(Seq(source),destination) - } - } - } - case None => { - if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier") - DefinitionIdentifier(identifier) - } - } - } -} - -object DataFlow extends DataFlowParser \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala deleted file mode 100644 index eb09156..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala +++ /dev/null @@ -1,95 +0,0 @@ -package org.apache.eagle.stream.pipeline.parser - -import java.io.File - -import com.typesafe.config.{Config, ConfigFactory} - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -case class Pipeline(config:Config,dataflow:DataFlow) - -/** - * Pipeline configuration parser - * - * For example: - * - * {{{ - * <code> - * { - * config { - * execution.environment.config = someValue - * } - * schema { - * metricStreamSchema { - * metric: string - * value: double - * timestamp: long - * } - * } - * dataflow { - * kafkaSource.source1 { - * schema = "metricStreamSchema" - * } - * kafkaSource.source2 { - * schema = { - * metric: string - * value: double - * timestamp: long - * } - * } - * } - * } - * </code> - * }}} - */ -trait PipelineParser{ - val CONFIG_FIELD = "config" - val SCHEMA_FIELD = "schema" - val DATAFLOW_FIELD = "dataflow" - - def parse(config:Config):Pipeline = { - if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty") - var pConfig:Config = ConfigFactory.empty() - var pSchemaSet:SchemaSet = SchemaSet.empty() - var pDataflow:DataFlow = null - if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD) - if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD)) - if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet) - - // Merge pipeline config over base config - val baseConfig =ConfigFactory.load() - pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig - new Pipeline(pConfig,pDataflow) - } - - def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config)) - - def parseStringWithConfig(dataFlow:String, config: Config) = { - val pConfig = config.withFallback(ConfigFactory.parseString(dataFlow)) - parse(pConfig) - } - - def parseResource(resource:String):Pipeline = { - // TODO: Load environment, currently hard-code with storm - if(resource.startsWith("/") || resource.startsWith("./")){ - parse(ConfigFactory.parseFile(new File(resource))) - } else{ - parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource)) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala deleted file mode 100644 index 7653f9e..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.parser - -import com.typesafe.config.Config - -import scala.collection.JavaConversions.mapAsScalaMap -import scala.collection.mutable - -class Field(name:String) extends Serializable{ - def getName:String = name -} - -case class StringField(name:String) extends Field(name) -case class LongField(name:String) extends Field(name) -case class IntegerField(name:String) extends Field(name) -case class BooleanField(name:String) extends Field(name) -case class FloatField(name:String) extends Field(name) -case class DoubleField(name:String) extends Field(name) -case class DatetimeField(name:String,format:String) extends Field(name) - -object Field{ - def string(name:String) = StringField(name) - def long(name:String) = LongField(name) - def integer(name:String) = IntegerField(name) - def boolean(name:String) = BooleanField(name) - def float(name:String) = FloatField(name) - def double(name:String) = DoubleField(name) - def datetime(name:String)(format:String) = DatetimeField(name,format) - - def apply(name:String,typeName:String):Field = typeName match { - case "string" => string(name) - case "long" => long(name) - case "integer" => integer(name) - case "boolean" => boolean(name) - case "float" => float(name) - case "double" => double(name) - case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""") - } -} - -case class Schema(attributes:Seq[Field]) extends Serializable{ - def getAttribute(attributeName:String):Option[Field]={ - if(attributes != null){ - attributes.find(_.getName.eq(attributeName)) - }else None - } - - def indexOfAttribute(attributeName:String):Int = { - if(attributes != null){ - attributes.indexWhere(_.getName.eq(attributeName)) - } else -1 - } - - @throws[IllegalArgumentException] - def indexOfAttributeOrException(attributeName:String):Int = { - if(attributes != null){ - attributes.indexWhere(_.getName.eq(attributeName)) - } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this") - } -} - -object Schema{ - def parse(map:Map[String,AnyRef]):Schema = { - new Schema(map.keys.map {attributeName => - map(attributeName) match{ - case simpleType:String => Field(attributeName,simpleType) - case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ") - case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType") - } - }.toSeq) - } - - /** - * @param attributes support string, symbol, Attribute and so on. - * @return - */ - def build(attributes:Seq[AnyRef]):Schema = { - new Schema(attributes.map{ a:AnyRef => - a match { - case t:(String, AnyRef) => { - t._2 match { - case v:String => Field(t._1,v) - case v:Symbol => Field(t._1,v.name) - case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a") - } - } - case t:Field => t - case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a") - } - }) - } -} - -private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable) - -private[pipeline] class SchemaSet { - private val processorSchemaCache = mutable.Map[String,Schema]() - def set(schemaId:String,schema:Schema):Unit = { - if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException( - s""" - |Failed to define schema for $schemaId as $schema, - |because it has been defined as ${processorSchemaCache(schemaId)}, - |please call updateSchema(processorId,schema) instead - """) - processorSchemaCache.put(schemaId,schema) - } - def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId) -} - -private[pipeline] object SchemaSet{ - def empty() = new SchemaSet() - /** - * For example: - * - * <code> - * { - * metricStream { - * metric: string - * value: double - * timestamp: long - * } - * } - * </code> - * @param schemaConfig - * @return - */ - def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = { - val schemas = new SchemaSet() - schemaConfig.foreach(entry =>{ - schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)) - }) - schemas - } - - def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap) -} - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala deleted file mode 100644 index 1c964e1..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.runner - -import java.util - -import com.typesafe.config.Config -import org.apache.commons.cli.{CommandLine, Options} -import org.apache.eagle.dataproc.util.ConfigOptionParser -import org.apache.eagle.datastream.ExecutionEnvironments.storm -import org.apache.eagle.datastream.core.ExecutionEnvironment -import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler -import org.apache.eagle.stream.pipeline.parser.PipelineParser -import org.slf4j.LoggerFactory - -import scala.reflect.runtime.{universe => ru} - -trait PipelineRunner extends PipelineParser with PipelineCompiler{ - import PipelineCLIOptionParser._ - private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser") - def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) = - compile(parseResource(resource)).submit[T] - def submit(resource:String,clazz:Class[ExecutionEnvironment]) = - compile(parseResource(resource)).submit(clazz) - def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) = - compile(parse(pipelineConfig)).submit(clazz) - def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) = - compile(parse(pipelineConfig)).submit[T] - - def apply(args:Array[String]):PipelineRunner = { - new ConfigOptionParser().load(args) - this - } - - def main(args: Array[String]): Unit = { - val config = PipelineCLIOptionParser.load(args) - if(config.hasPath(PIPELINE_CONFIG_KEY)) { - submit[storm](config.getString(PIPELINE_CONFIG_KEY)) - } else { - sys.error( - s""" - |Error: --$PIPELINE_OPT_KEY is required - |$USAGE - """.stripMargin) - } - } -} - -private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{ - val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser") - val PIPELINE_OPT_KEY="pipeline" - - val PIPELINE_CONFIG_KEY="pipeline.config" - - val CONFIG_OPT_KEY="conf" - val CONFIG_RESOURCE_KEY="config.resource" - val CONFIG_FILE_KEY="config.file" - val USAGE = - """ - |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options] - | - |Options: - | --pipeline pipeline configuration - | --conf common configuration - | --env storm (support spark, etc later) - | --mode local/remote/cluster - """.stripMargin - - override protected def options(): Options = { - val options = super.options() - options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file") - options.addOption(CONFIG_OPT_KEY, true, "Config properties file") - options - } - - override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = { - val map = super.parseCommand(cmd) - - if (cmd.hasOption(PIPELINE_OPT_KEY)) { - val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY) - if(pipelineConf == null){ - throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null") - } else { - LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf") - map.put(PIPELINE_CONFIG_KEY, pipelineConf) - } - } - - if(cmd.hasOption(CONFIG_OPT_KEY)){ - val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY) - if(commonConf.contains("/")){ - LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf") - map.put(CONFIG_FILE_KEY, commonConf) - }else { - LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf") - map.put(CONFIG_RESOURCE_KEY, commonConf) - } - } - map - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala deleted file mode 100644 index 1102a33..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.stream.pipeline.utils - -class ParseException(message:String) extends Exception(message) -class CompileException(message:String) extends Exception(message) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf deleted file mode 100644 index 3e8f69c..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - "eagleProps" : { - "dataJoinPollIntervalSec" : 30 - "mailHost" : "smtp.server.host" - "mailSmtpPort":"25" - "mailDebug" : "true" - "eagleService": { - "host": "localhost" - "port": 9099 - "username": "admin" - "password": "secret" - } - } - "dynamicConfigSource" : { - "enabled" : true - "initDelayMillis" : 0 - "delayMillis" : 30000 - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh deleted file mode 100644 index 4250681..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration] - - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties deleted file mode 100644 index c8a4f46..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties +++ /dev/null @@ -1,19 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf deleted file mode 100644 index 6dddf7a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf +++ /dev/null @@ -1,131 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - config { - alertExecutorConfigs { - defaultAlertExecutor { - "parallelism" : 1 - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - } - eagleProps { - "site" : "sandbox" - "application": "eventSource" - "dataJoinPollIntervalSec" : 30 - "mailHost" : "mail.host.com" - "mailSmtpPort":"25" - "mailDebug" : "true" - "eagleService": { - "host": "localhost" - "port": 38080 - "username": "admin" - "password": "secret" - } - } - dynamicConfigSource { - "enabled" : true - "initDelayMillis" : 0 - "delayMillis" : 30000 - } - } - - schema { - metricStreamSchema { - metric: string - value: double - timestamp: long - } - } - - dataflow { - KafkaSource.metricStream_1 { - parallism = 1000 - topic = "metric_event_1" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - schema = "metricStreamSchema" - } - - KafkaSource.metricStream_2 { - parallism = 1000 - topic = "metric_event_2" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.metricStream_3{ - parallism = 1000 - topic = "metric_event_3" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - schema = "metricStreamSchema" - } - - KafkaSink.metricStore { - schema = "metricStreamSchema" - parallism = 1000 - topic = "metric_event_2" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - Alert.alert { -// upStreamNames = [metricStream_1,metricStream_2] - alertExecutorId = defaultAlertExecutor - } - -// aggregator.aggreator { -// executor = "aggreationExecutor" -// } - - metricStream_1|metricStream_2 -> alert { - group = shuffle - } - - metricStream_1|metricStream_2 -> metricStore { - group = shuffle - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf deleted file mode 100644 index 5e3561a..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - config { - envContextConfig { - "env" : "storm" - "mode" : "local" - "topologyName" : "dsl-based-topology" - } - eagleProps { - "site" : "sandbox" - "application": "eventSource" - "dataJoinPollIntervalSec" : 30 - "mailHost" : "mail.host.com" - "mailSmtpPort":"25" - "mailDebug" : "true" - "eagleService": { - "host": "localhost" - "port": 38080 - "username": "admin" - "password": "secret" - } - } - dynamicConfigSource { - "enabled" : true - "initDelayMillis" : 0 - "delayMillis" : 30000 - } - } - - dataflow { - KafkaSource.metricStream_1 { - parallism = 1000 - topic = "metric_event_1" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.metricStream_2 { - parallism = 1000 - topic = "metric_event_2" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.metricStream_3{ - parallism = 1000 - topic = "metric_event_3" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - Console.printer {} - - metricStream_1|metricStream_2|metricStream_3 -> printer { - grouping = shuffle - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf deleted file mode 100644 index 9dc7ce3..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf +++ /dev/null @@ -1,152 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - config { - envContextConfig { - "env" : "storm" - "mode" : "local" - "topologyName" : "dsl-based-topology" - "parallelismConfig" : { - "kafkaMsgConsumer" : 1 - } - } - alertExecutorConfigs { - defaultAlertExecutor { - "parallelism" : 1 - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - } - eagleProps { - "site" : "sandbox" - "application": "HADOOP" - "dataJoinPollIntervalSec" : 30 - "mailHost" : "some.mail.server" - "mailSmtpPort":"25" - "mailDebug" : "true" - "eagleService": { - "host": "localhost" - "port": 38080 - "username": "admin" - "password": "secret" - } - } - dynamicConfigSource { - "enabled" : true - "initDelayMillis" : 0 - "delayMillis" : 30000 - } - } - - schema { -// JmxStreamOne { -// attributes { -// metric: string -// value: double -// timestamp: long -// } -// alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor] -// } - JmxStreamOne { - metric: string - value: double - timestamp: long - } - } - - dataflow { - KafkaSource.JmxStreamOne { - parallism = 1000 - topic = "metric_event_1" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamTwo { - parallism = 1000 - topic = "metric_event_2" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamThree{ - parallism = 1000 - topic = "metric_event_3" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - Console.printer { - format = "%s" - } - - KafkaSink.metricStore { - topic = "metric_event_persist" - } - -// KafkaSink.alertStore { -// "topic" = "alert_persist" -// "bootstrap.servers" = "localhost:6667" -// } - - Alert.alert { - inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] - - upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] - alertExecutorId = defaultAlertExecutor - } - -// Aggregator.aggreator { -// upStreamNames = [] -// analyzerId = "" -// cepQl = "" -// strategy = "" -// } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer { - grouping = shuffle - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf deleted file mode 100644 index 9c35456..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf +++ /dev/null @@ -1,125 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing software -# distributed under the License is distributed on an "AS IS" BASIS -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -{ - config { - envContextConfig { - "env" : "storm" - "mode" : "local" - "topologyName" : "dsl-topology" - "parallelismConfig" : { - "kafkaMsgConsumer" : 1 - }, - } - alertExecutorConfigs { - defaultAlertExecutor { - "parallelism" : 1 - "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner" - "needValidation" : "true" - } - } - eagleProps { - "site" : "sandbox" - "application": "HADOOP" - } - } - - dataflow { - KafkaSource.JmxStreamOne { - parallism = 1000 - topic = "metric_event_1" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamTwo { - parallism = 1000 - topic = "metric_event_2" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - KafkaSource.JmxStreamThree{ - parallism = 1000 - topic = "metric_event_3" - zkConnection = "localhost:2181" - zkConnectionTimeoutMS = 15000 - consumerGroupId = "Consumer" - fetchSize = 1048586 - transactionZKServers = "localhost" - transactionZKPort = 2181 - transactionZKRoot = "/consumers" - transactionStateUpdateMS = 2000 - deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer" - } - - Console.printer { - format = "%s" - } - - KafkaSink.metricStore { - topic = "metric_event_persist" - } - -// KafkaSink.aggSink { -// topic = "metric_agg_persist" -// } - - Alert.defaultAlertExecutor { - // upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree] - // alertExecutorId = defaultAlertExecutor - } - - Aggregator.Aggregator{ sql = """ - define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double); - @info(name = "query") - from JmxStreamOne[value > 100.0] select * insert into outputStream; - """} - - - JmxStreamOne -> Aggregator {} - - Aggregator -> printer {} - -// Aggregator -> aggregatedSink{ -// grouping = shuffle -// } - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore { - grouping = shuffle - } - - JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer { - grouping = shuffle - } - } -} \ No newline at end of file
