http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala index c511484..444559a 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala @@ -22,7 +22,7 @@ import com.typesafe.config.Config trait StreamContextAdapter{ def submit(context:StreamContext):Unit = { - execute(context.build) + execute(null) } def execute(dag: StreamDAG) }
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala deleted file mode 100644 index 9564a0d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala +++ /dev/null @@ -1,80 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory -import org.apache.eagle.datastream.FlatMapper -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConversions.asScalaSet -import scala.collection.mutable.ListBuffer - -/** - * The expansion job for stream analyze - * - * TODO : should re-use flow with stream alert expansion, make code cleaner - */ -class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) { - - override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any], - child: StreamProducer[Any]): Unit = { - child match { - case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => { - /** - * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format - */ - val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames) - - val analyzeExecutors = if (cepQl != null) { - AggregateExecutorFactory.Instance.createExecutors(upStreamNames, cepQl) - } else { - AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId) - } - - analyzeExecutors.foreach(exec => { - val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream) - - // connect with previous - if (strategy == null) { - newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t)) - } else { - newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy)) - } - - // connect with next - val outgoingEdges = dag.outgoingEdgesOf(child) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e)) - }) - - // remote current child - toBeRemovedVertex += child - } - case _ => - } - } -} - -object StreamAggregateExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={ - val e = new StreamAggregateExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala deleted file mode 100644 index 618bba3..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala +++ /dev/null @@ -1,257 +0,0 @@ -/* - - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import java.util - -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity -import org.apache.eagle.alert.executor.AlertExecutorCreationUtils -import org.apache.eagle.policy.common.Constants -import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl - -import scala.collection.JavaConversions.asScalaSet -import scala.collection.mutable.ListBuffer -import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper -import org.apache.eagle.datastream.JavaStormStreamExecutor -import org.apache.eagle.datastream.StormStreamExecutor -import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper -import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils -import org.apache.eagle.service.client.EagleServiceConnector -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import com.typesafe.config.Config - -/** - * The constraints for alert is: - * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer - * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer - * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap - * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value} - * 5. the framework will automatically partition traffic with first field - * - * - * 2 steps - * step 1: wrapper previous StreamProducer with one more field "streamName" - * step 2: partition alert executor by policy partitioner class - */ - -case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) { - val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion]) - import StreamAlertExpansion._ - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={ - val iter = dag.iterator() - val toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - val toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child) - }) - } - // add back edges - toBeAddedEdges.foreach(e => { - dag.addVertex(e.from) - dag.addVertex(e.to) - dag.addEdge(e.from, e.to, e) - }) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } - - def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = { - child match { - case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => { - /** - * step 1: wrapper previous StreamProducer with one more field "streamName" - * for AlertStreamSink, we check previous StreamProducer and replace that - */ - val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames) - - /** - * step 2: partition alert executor by policy partitioner class - */ - val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, - new PolicyDefinitionEntityDAOImpl[AlertDefinitionAPIEntity](new EagleServiceConnector(config), Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME), - upStreamNames, alertExecutorId) - var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]] - alertExecutors.foreach(exec => { - val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false) - alertProducers += t - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0))) - if (strategy == null) { - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,Seq(0))) - } - else { - newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp,t,strategy)) - } - }) - - // remove AlertStreamSink - toBeRemovedVertex += child - - // add alert consumer if necessary - if (withConsumer) { - AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList) - } - } - case _ => - } - } - - protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = { - if(upStreamNames == null) throw new NullPointerException("upStreamNames is null") - - /** - * step 1: wrapper previous StreamProducer with one more field "streamName" - * for AlertStreamSink, we check previous StreamProducer and replace that - */ - val newStreamProducers = new ListBuffer[StreamProducer[Any]] - current match { - case StreamUnionProducer(others) => { - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0))) - var i: Int = 1 - others.foreach(o => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i)) - i += 1 - }) - } - case p: FlatMapProducer[AnyRef, AnyRef] => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames)) - } - case p: MapperProducer[AnyRef,AnyRef] => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames)) - } - case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => { - newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames)) - } - case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported") - } - newStreamProducers - } - - - protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], - dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= { - var newsp: StreamProducer[Any] = null - current match { - case _: FlatMapProducer[AnyRef, AnyRef] => { - val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper - mapper match { - case a: JavaStormStreamExecutor[AnyRef] => { - val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId) - } - case b: StormStreamExecutor[AnyRef] => { - val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName) - newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId) - } - case _ => throw new IllegalArgumentException - } - // remove old StreamProducer and replace that with new StreamProducer - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case _: MapperProducer[Any,Any] => { - val mapper = current.asInstanceOf[MapperProducer[Any,Any]].fn - val newfun: (Any => Any) = { a => - val result = mapper(a) - result match { - case scala.Tuple1(x1) => (null, upStreamName, x1) - case scala.Tuple2(x1, x2) => (x1, upStreamName, x2) - case scala.Tuple3(_, _, _) => result - case _ => throw new IllegalArgumentException(s"Illegal message :$result, Tuple1/Tuple2/Tuple3 are supported") - } - } - current match { - case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream) - case _ => throw new IllegalArgumentException(s"Illegal producer $current") - } - val incomingEdges = dag.incomingEdgesOf(current) - incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp)) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to)) - toBeRemovedVertex += current - } - case s: StreamProducer[Any] if dag.inDegreeOf(s) == 0 => { - val fn:(AnyRef => AnyRef) = { - n => { - n match { - case scala.Tuple3 => n - case scala.Tuple2(x1,x2) => (x1,upStreamName,x2) - case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1) - case _ => (if(n == null) null else n.hashCode(),upStreamName,n) - } - } - } - newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream) - toBeAddedEdges += StreamConnector(current,newsp) - val outgoingEdges = dag.outgoingEdgesOf(current) - outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to)) - } - case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink") - } - newsp - } -} - -object StreamAlertExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAlertExpansion ={ - val e = StreamAlertExpansion(config) - e.expand(dag) - e - } - - /** - * Try upStreamNames firstly, otherwise try producer.streamId - * - * @param producer - * @param upStreamNames - * @return - */ - private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = { - if(upStreamNames == null){ - producer.streamId - }else if(upStreamNames.size()>1){ - if(producer.streamId == null) { - if (upStreamNames.size() > 1) - throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer) - else - upStreamNames.get(0) - } else { - producer.streamId - } - } else if(upStreamNames.size() == 1){ - upStreamNames.get(0) - }else { - if(producer.streamId == null){ - throw new IllegalArgumentException("No stream name found for "+producer) - } else - producer.streamId - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala index 6e21bcc..fcd89c0 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala @@ -18,17 +18,17 @@ package org.apache.eagle.datastream.core import com.typesafe.config.{Config, ConfigFactory} import org.apache.eagle.dataproc.util.ConfigOptionParser -import org.apache.eagle.datastream.ExecutionEnvironments -import org.apache.eagle.datastream.utils.GraphPrinter import org.jgrapht.experimental.dag.DirectedAcyclicGraph +import scala.reflect import scala.reflect.runtime.universe._ trait StreamContextBuilder extends StreamSourceBuilder { def config:Configuration /** * Business logic DAG - * @return + * + * @return */ def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] /** @@ -48,31 +48,23 @@ class StreamContext(private val conf:Config) extends StreamContextBuilder{ private val _config:Configuration = Configuration(conf) override def dag = _dag override def config = _config - override def build: StreamDAG = { - implicit val i_conf = _config.get - StreamNameExpansion() - GraphPrinter.print(dag,message="Before expanded DAG ") - StreamAggregateExpansion() - StreamAlertExpansion() - StreamUnionExpansion() - StreamGroupbyExpansion() - StreamParallelismConfigExpansion() - StreamNameExpansion() - GraphPrinter.print(dag,message="After expanded DAG ") - GraphPrinter.printDotDigraph(dag) - StreamDAGTransformer.transform(dag) + + def build:StreamDAG = { + null } + override def submit(env: ExecutionEnvironment): Unit = { env.submit(this) } - override def submit(clazz: Class[ExecutionEnvironment]): Unit = { - ExecutionEnvironments.get(clazz,conf).submit(this) + def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit = { + } - override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = { - ExecutionEnvironments.getWithConfig[E](conf).submit(this) + + override def submit(clazz: Class[ExecutionEnvironment]): Unit = { + } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala deleted file mode 100644 index ce9d82c..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamConnector.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -import org.apache.eagle.partition.PartitionStrategy - -abstract class StreamConnector[+T1 <: Any,+T2 <: Any](val from: StreamProducer[T1], val to: StreamProducer[T2]) extends Serializable - -case class ShuffleConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2]) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = "shuffleGroup" -} - -case class GroupbyFieldsConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],groupByFields : Seq[Int]) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByFields( $groupByFields )" -} - -case class GroupbyKeyConnector[T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],keySelector: T1 => Any) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByKey($keySelector)" -} - -case class GroupbyStrategyConnector[+T1 <: Any,+T2 <: Any](override val from: StreamProducer[T1], override val to: StreamProducer[T2],customGroupBy:PartitionStrategy) - extends StreamConnector[T1,T2](from,to){ - override def toString: String = s"groupByStrategy( $customGroupBy )" -} - -object StreamConnector{ - /** - * - * @param from - * @param to - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2]):ShuffleConnector[T1,T2] = ShuffleConnector(from,to) - - /** - * Clone connector from old connector to apply to new processing element, return ShuffleConnector by default - * - * @param from - * @param to - * @param connector - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],connector: StreamConnector[Any,Any]):StreamConnector[T1,T2] = connector match { - case GroupbyFieldsConnector(_,_,fields) => GroupbyFieldsConnector[T1,T2](from,to,fields) - case GroupbyKeyConnector(_,_,keySelector) => GroupbyKeyConnector[T1,T2](from,to,keySelector) - case GroupbyStrategyConnector(_,_,strategy) => GroupbyStrategyConnector[T1,T2](from,to,strategy) - case null | ShuffleConnector(_,_) => ShuffleConnector[T1,T2](from,to) - case c@_ => throw new IllegalArgumentException(s"Unknown type of stream connector $c") - } - - /** - * - * @param from - * @param to - * @param groupByFields - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],groupByFields : Seq[Int]):GroupbyFieldsConnector[T1,T2] = GroupbyFieldsConnector(from,to,groupByFields) - - /** - * - * @param from - * @param to - * @param customGroupBy - * @tparam T1 - * @tparam T2 - * @return - */ - def apply[T1 <: Any,T2 <: Any](from: StreamProducer[T1], to: StreamProducer[T2],customGroupBy: PartitionStrategy):GroupbyStrategyConnector[T1,T2] = GroupbyStrategyConnector(from,to,customGroupBy) - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala deleted file mode 100644 index 7845740..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConverters._ -import scala.collection.{JavaConversions, mutable} - -/** - * wrapper of DAG, used for storm topology compiler - */ -class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph { - var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]() - graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p)) - - override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = { - graph.addEdge(from, to, streamConnector) - } - - override def addVertex(producer: StreamProducer[Any]): Unit = { - graph.addVertex(producer) - nodeMap.put(producer.name,producer) - } - - override def iterator(): Iterator[StreamProducer[Any]] = { - JavaConversions.asScalaIterator(graph.iterator()) - } - - override def isSource(v: StreamProducer[Any]): Boolean = { - graph.inDegreeOf(v) match { - case 0 => true - case _ => false - } - } - - override def outgoingEdgesOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamConnector[Any,Any]] = { - JavaConversions.asScalaSet(graph.outgoingEdgesOf(v)) - } - - override def getNodeByName(name: String): Option[StreamProducer[Any]] = { - nodeMap.get(name) - } - - def setNodeMap(nodeMap: mutable.Map[String, StreamProducer[Any]]): Unit = { - this.nodeMap = nodeMap - } - - override def incomingVertexOf(v: StreamProducer[Any]): scala.collection.mutable.Set[StreamProducer[Any]] = { - val set = mutable.Set[StreamProducer[Any]]() - graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e)) - set - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala deleted file mode 100644 index fcff639..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGExpansion.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -abstract class StreamDAGExpansion(config: Config) { - def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala deleted file mode 100644 index 6b20bf2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.mutable - -/** - * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler - */ -object StreamDAGTransformer { - /** - * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG - * - * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector] - * @return StormStreamDAG - */ - @deprecated("Use StreamDAG(dag) will transform directly") - def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = { - val stormDAG = new StreamDAG(dag) - val nodeMap = mutable.HashMap[String, StreamProducer[Any]]() - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - nodeMap.put(sp.name, sp) - } - stormDAG.setNodeMap(nodeMap) - stormDAG - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala deleted file mode 100644 index 1a07e3f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamGroupbyExpansion.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * Replace GroupByProducer(Vertex) with StreamConnector (Edge) - * - * For example as to Storm, it's mainly for grouping method - * - * @param config context configuration - */ -case class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){ - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - child match { - case p : GroupByProducer[Any] => { - dag.outgoingEdgesOf(p).foreach(c2 => { - p match { - case GroupByFieldProducer(fields) => - toBeAddedEdges += GroupbyFieldsConnector(current, c2.to,fields) - case GroupByStrategyProducer(strategy) => - toBeAddedEdges += GroupbyStrategyConnector(current, c2.to,strategy) - case GroupByKeyProducer(keySelector) => - current.outKeyed = true - current.keySelector = KeySelectorWrapper(keySelector) - c2.to.inKeyed = true - toBeAddedEdges += GroupbyKeyConnector(current, c2.to,keySelector) - case _ => toBeAddedEdges += ShuffleConnector(current, c2.to) - } - }) - toBeRemovedVertex += p - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} - -object StreamGroupbyExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamGroupbyExpansion ={ - val e = StreamGroupbyExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala deleted file mode 100644 index 4bc1812..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamNameExpansion.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.eagle.datastream.utils.NodeNameSelector -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -/** - * to set name for each StreamProducer - * 1. if name is given programatically, then use this name - * 2. otherwise use name generated by scala internally - */ -case class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - while(iter.hasNext){ - val sp = iter.next() - sp.name = NodeNameSelector(sp).getName - } - } -} - - -object StreamNameExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamNameExpansion ={ - val e = StreamNameExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala deleted file mode 100644 index 8699da6..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import java.util.regex.Pattern - -import com.typesafe.config.{Config, ConfigObject, ConfigValue} -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val map = getParallelismMap(config) - val iter = dag.iterator() - while(iter.hasNext){ - val streamProducer = iter.next() - if(streamProducer.name != null) { - map.foreach(tuple => { - tuple._1.matcher(streamProducer.name).find() match { - case true => streamProducer.parallelism(tuple._2) - case false => - } - }) - } - } - } - - private def getParallelismMap(config: Config) : Map[Pattern, Int]= { - if(config.hasPath("envContextConfig.parallelismConfig")) { - val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig") - parallelismConfig.asScala.toMap map { - case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int]) - } - }else{ - Map[Pattern,Int]() - } - } -} - -object StreamParallelismConfigExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamParallelismConfigExpansion ={ - val e = StreamParallelismConfigExpansion(config) - e.expand(dag) - e - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala index 6445643..33cc2dd 100644 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala +++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala @@ -173,16 +173,6 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[ ret } - /** - * alert is always sink of data flow - */ - def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames.asScala, alertExecutorId,consume = true) - } - - def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = { - alert(upStreamNames.asScala, alertExecutorId, consume = false) - } override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = { val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy) http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala deleted file mode 100644 index f3fcc4d..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducerGraph.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.datastream.core - -trait StreamProducerGraph { - def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]) - def addVertex(producer: StreamProducer[Any]) - def iterator() : Iterator[StreamProducer[Any]] - def isSource(v : StreamProducer[Any]) : Boolean - def outgoingEdgesOf(v : StreamProducer[Any]) : scala.collection.mutable.Set[StreamConnector[Any,Any]] - def getNodeByName(name : String) : Option[StreamProducer[Any]] - def incomingVertexOf(v: StreamProducer[Any]) : scala.collection.mutable.Set[StreamProducer[Any]] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala deleted file mode 100644 index b54b21f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.apache.commons.lang3.builder.HashCodeBuilder -import org.apache.eagle.datastream.{Collector, FlatMapper} -import org.apache.eagle.partition.PartitionStrategy -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -/** - * StreamInfo should be fully serializable and having not runtime type information - */ -class StreamInfo extends Serializable{ - /** - * Processing Element Id - */ - val id:Int = UniqueId.incrementAndGetId() - - /** - * Processing Element Name - */ - var name: String = null - - /** - * Output stream id, equals to name by default - */ - var streamId:String=null - - var parallelismNum: Int = 1 - - /** - * Keyed input stream - */ - var inKeyed:Boolean = false - /** - * Keyed output stream - */ - var outKeyed:Boolean = false - /** - * Output key selector - */ - var keySelector:KeySelector = null - -// Type Information -// ================ -// -// /** -// * Entity class type of T -// */ -// var typeClass:Class[_] = null -// -// /** -// * Type Class Simple Name -// * @return -// */ -// def typeClassName = if(typeClass == null) null else typeClass.getSimpleName -// -// @transient private var _typeTag[_] = null -// -// def typeTag[_] = { -// if(_typeTag == null) _typeTag = Reflections.typeTag(this.typeClass) -// _typeTag -// } - - var config: Config = null - - def getInfo = this - - override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode -} - - -object StorageType extends Enumeration { - type StorageType = Value - val KAFKA, DRUID, HBASE = Value -} - -/** - * Stream interaction protocol interface - * - * @tparam T processed elements type - */ -trait StreamProtocol[+T <: Any]{ - /** - * Initialize the stream metadata info - */ - def initWith(graph:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]],config:Config, hook:Boolean = true):StreamProducer[T] - - /** - * Support Java API - * - * @param flatMapper - * @tparam R - * @return - */ - def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] - def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] - - /** - * - * @param fn - * @return - */ - def filter(fn : T => Boolean): StreamProducer[T] - - /** - * - * @param fn - */ - def foreach(fn : T => Unit) : Unit - - /** - * Type safe mapper - * @param fn - * @tparam R - * @return - */ - def map[R](fn : T => R): StreamProducer[R] - - /** - * Field base mapper - * @param fn - * @tparam R - * @return - */ - def map1[R](fn : T => R) : StreamProducer[R] - def map2[R](fn : T => R) : StreamProducer[R] - def map3[R](fn : T => R) : StreamProducer[R] - def map4[R](fn : T => R) : StreamProducer[R] - - def groupBy(fields : Int*) : StreamProducer[T] - def groupBy(fields : java.util.List[Integer]) : StreamProducer[T] - def groupBy(strategy : PartitionStrategy) : StreamProducer[T] - - /** - * @param keyer key selector function - * @return - */ - def groupByKey(keyer:T => Any):StreamProducer[T] - - def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3] - def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer - - def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T] - - def aggregateDirect(upStreamNames: java.util.List[String], cql : String, strategy:PartitionStrategy): StreamProducer[T] - - def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T] - - /** - * Set processing element parallelism setting - * @param parallelismNum parallelism value - * @return - */ - def parallelism(parallelismNum : Int) : StreamProducer[T] - def parallelism : Int - /** - * Set component name - * - * @param componentName - * @return - */ - def nameAs(componentName : String) : StreamProducer[T] - - /** - * Set stream name - * @param streamId stream ID - * @return - */ - def stream(streamId: String): StreamProducer[T] - def stream: String - - def ? (fn:T => Boolean):StreamProducer[T] = this.filter(fn) - def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper) - def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy) - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala deleted file mode 100644 index 5f3bd22..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.core - -import org.jgrapht.experimental.dag.DirectedAcyclicGraph - -import scala.reflect.runtime.{universe => ru} - -/** - * @since 12/7/15 - */ -trait StreamSourceBuilder { - def config:Configuration - - /** - * Business logic DAG - * @return - */ - def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]] - - /** - * - * @param iterable top level Iterable interface - * @param recycle - * @tparam T - * @return - */ - def from[T:ru.TypeTag](iterable: Iterable[T],recycle:Boolean = false):IterableStreamProducer[T]={ - val p = IterableStreamProducer[T](iterable,recycle) - p.initWith(dag,config.get) - p - } - - def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={ - val p = IteratorStreamProducer[T](iterator) - p.initWith(dag,config.get) - p - } - - def from(product: Product):IteratorStreamProducer[Any]={ - val p = IteratorStreamProducer[Any](product.productIterator) - p.initWith(dag,config.get) - p - } - - def register[T](producer:StreamProducer[T]):Unit = { - producer.initWith(dag,config.get) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala deleted file mode 100644 index 351782b..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamUnionExpansion.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ - -package org.apache.eagle.datastream.core - -import com.typesafe.config.Config -import org.jgrapht.experimental.dag.DirectedAcyclicGraph -import org.slf4j.LoggerFactory - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer - -/** - * union operator should be expanded - */ -case class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){ - val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion]) - - override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) = { - val iter = dag.iterator() - var toBeAddedEdges = new ListBuffer[StreamConnector[Any,Any]] - var toBeRemovedVertex = new ListBuffer[StreamProducer[Any]] - while(iter.hasNext) { - val current = iter.next() - dag.outgoingEdgesOf(current).foreach(edge => { - val child = edge.to - child match { - case StreamUnionProducer(others) => { - dag.outgoingEdgesOf(child).foreach(c2 => { - toBeAddedEdges += StreamConnector(current, c2.to, edge) - others.foreach(o => { - toBeAddedEdges += StreamConnector(o, c2.to, edge) - }) - }) - toBeRemovedVertex += child - } - case _ => - } - }) - } - - // add back edges - toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e)) - toBeRemovedVertex.foreach(v => dag.removeVertex(v)) - } -} - -object StreamUnionExpansion{ - def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamUnionExpansion ={ - val e = StreamUnionExpansion(config) - e.expand(dag) - e - } -} - http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala deleted file mode 100644 index 64b5f0f..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.apache.eagle.datastream.core.StreamInfo -import org.apache.eagle.datastream.utils.NameConstants -import org.slf4j.LoggerFactory - -/** - * - * @param fieldsNum zero-fieldsNum may means something different - * @param ack - * @param streamInfo - * @tparam T - */ -abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{ - private var _collector: OutputCollector = null - private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]]) - - /** - * If outKeyed then - * Fields = ("key","value"] - * elsif num > 0 - * Fields = ["f0","f1",..,"fn"] - * elsif num == 0 - * Fields = ["f0"] - * end - * - * @param declarer - */ - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - if(streamInfo.outKeyed) { - declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE)) - }else{ - if(fieldsNum > 0) { - val fields = new util.ArrayList[String]() - var i: Int = 0 - while (i < fieldsNum) { - fields.add(NameConstants.FIELD_PREFIX + i) - i += 1 - } - declarer.declare(new Fields(fields)) - }else if(fieldsNum == 0){ - declarer.declare(new Fields(NameConstants.FIELD_PREFIX + 0)) - } - } - } - - def emit(values:util.List[AnyRef])(implicit input:Tuple){ - if (streamInfo.outKeyed) { - _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(values).asInstanceOf[AnyRef], values)) - } else { - _collector.emit(input, values) - } - } - - def emit(value:Any)(implicit input:Tuple){ - if(streamInfo.outKeyed) { - _collector.emit(input, util.Arrays.asList(streamInfo.keySelector.key(value).asInstanceOf[AnyRef],value.asInstanceOf[AnyRef])) - }else{ - _collector.emit(input,util.Arrays.asList(value.asInstanceOf[AnyRef])) - } - } - - override def execute(input: Tuple): Unit = { - try { - implicit val _input = input - if (streamInfo.inKeyed) { - val key = input.getValueByField(NameConstants.FIELD_KEY) - val value = input.getValueByField(NameConstants.FIELD_VALUE).asInstanceOf[T] - onKeyValue(key, value) - } else { - onValues(input.getValues) - } - if(ack) _collector.ack(input) - }catch { - case t: Throwable => { - LOG.error(s"Got exception when processing $input",t) - _collector.fail(input) - } - } - } - - /** - * Handle keyed stream value - */ - def onKeyValue(key:Any,value:T)(implicit input:Tuple) - - /** - * Handle general stream values list - * - * @param values - */ - def onValues(values:util.List[AnyRef])(implicit input:Tuple) - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala deleted file mode 100644 index b175b41..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/FilterBoltWrapper.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.tuple.Tuple -import org.apache.eagle.datastream.core.StreamInfo - -case class FilterBoltWrapper(fn:Any => Boolean)(implicit info:StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = 1){ - /** - * Handle keyed stream value - */ - override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = { - if(fn(value)) emit(value) - } - - /** - * Handle general stream values list - * - * @param values - */ - override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = { - val value = values.get(0) - if(fn(value)) emit(value) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala deleted file mode 100644 index 4c105b2..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/ForeachBoltWrapper.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.tuple.Tuple -import org.apache.eagle.datastream.core.StreamInfo - -/** - * @since 12/6/15 - */ -case class ForeachBoltWrapper(fn:Any=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any] { - /** - * Handle keyed stream value - * @param value - */ - override def onKeyValue(key:Any,value: Any)(implicit input:Tuple): Unit = { - fn(value) - } - - /** - * Handle non-keyed stream values list - * - * @param values - */ - override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = { - fn(values) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala deleted file mode 100644 index c64ea83..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.spout.SpoutOutputCollector -import backtype.storm.task.TopologyContext -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichSpout -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils -import org.apache.eagle.datastream.core.StreamInfo -import org.apache.eagle.datastream.utils.NameConstants -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -/** - * @since 12/6/15 - */ -case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(implicit info:StreamInfo) extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout]) - var _collector:SpoutOutputCollector=null - var _iterator:Iterator[Any] = null - - override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { - this._collector = collector - this._iterator = iterable.iterator - } - - override def nextTuple(): Unit = { - if(_iterator.hasNext){ - val current = _iterator.next().asInstanceOf[AnyRef] - if(info.outKeyed) { - _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]]) - }else{ - _collector.emit(List(current).asJava) - } - }else if(recycle){ - if(LOG.isDebugEnabled) LOG.debug("Recycling the iterator") - _iterator = iterable.iterator - }else{ - if(LOG.isDebugEnabled) LOG.debug("No tuple left, sleep forever") - this.deactivate() - Utils.sleep(Long.MaxValue) - } - } - - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - if(info.outKeyed) { - declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE)) - }else{ - declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0")) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala deleted file mode 100644 index ea6d658..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.spout.SpoutOutputCollector -import backtype.storm.task.TopologyContext -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichSpout -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils -import org.apache.eagle.datastream.core.StreamInfo -import org.apache.eagle.datastream.utils.NameConstants -import org.slf4j.LoggerFactory - -import scala.collection.JavaConverters._ - -case class IteratorStreamSpout(iterator: Iterator[Any])(implicit info:StreamInfo) extends BaseRichSpout { - val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout]) - var _collector:SpoutOutputCollector=null - var _iterator:Iterator[Any] = null - - override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { - this._collector = collector - this._iterator = iterator - } - - override def nextTuple(): Unit = { - if(_iterator.hasNext){ - val current = _iterator.next().asInstanceOf[AnyRef] - if(info.outKeyed) { - _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]]) - }else{ - _collector.emit(List(current).asJava) - } - }else{ - LOG.info("No tuple left, sleep forever") - this.deactivate() - Utils.sleep(Long.MaxValue) - } - } - - override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { - if(info.outKeyed) { - declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE)) - }else{ - declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0")) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala deleted file mode 100644 index 802c782..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JavaStormBoltWrapper.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.task.{OutputCollector, TopologyContext} -import backtype.storm.topology.OutputFieldsDeclarer -import backtype.storm.topology.base.BaseRichBolt -import backtype.storm.tuple.{Fields, Tuple} -import org.apache.eagle.datastream.{Collector, JavaStormStreamExecutor} -import org.slf4j.LoggerFactory - -case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[AnyRef]) extends BaseRichBolt{ - val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass) - var _collector : OutputCollector = null - - override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = { - _collector = collector - worker.init - } - - override def execute(input : Tuple): Unit ={ - worker.flatMap(input.getValues, new Collector[AnyRef](){ - def collect(t: AnyRef): Unit ={ - _collector.emit(input, StormWrapperUtils.productAsJavaList(t.asInstanceOf[Product])) - } - }) - _collector.ack(input) - } - - override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={ - val fields = worker.fields - LOG.info("output fields for worker " + worker + " : " + fields.toList) - declarer.declare(new Fields(fields:_*)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala deleted file mode 100644 index 19305fa..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.io.IOException -import java.util -import java.util.Properties - -import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer -import org.slf4j.{Logger, LoggerFactory} - -/** - * @since 11/6/15 - */ -case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDeserializer{ - private val objectMapper: ObjectMapper = new ObjectMapper - private val LOG: Logger = LoggerFactory.getLogger(classOf[JsonMessageDeserializer]) - - override def deserialize(bytes: Array[Byte]): AnyRef = { - var map: util.Map[String, _] = null - if(bytes.length == 0 || bytes == null){ - if(LOG.isDebugEnabled) LOG.warn("Skip empty message") - }else { - try { - map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]]) - } catch { - case e: IOException => { - LOG.error("Failed to deserialize json from: " + new String(bytes), e) - } - } - } - map - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala deleted file mode 100644 index 8c92590..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/KafkaStreamMonitor.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider -import org.apache.eagle.datastream.ExecutionEnvironments - -class KafkaStreamMonitorApp extends App { - val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) - val streamName = env.config.get[String]("eagle.stream.name","eventStream") - val streamExecutorId = env.config.get[String]("eagle.stream.executor",s"${streamName}Executor") - env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName) - env.fromSpout(new KafkaSourcedSpoutProvider()).parallelism(1).nameAs(streamName) ! (Seq(streamName),streamExecutorId) - env.execute() -} - -object KafkaStreamMonitor extends KafkaStreamMonitorApp \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala deleted file mode 100644 index 1119e08..0000000 --- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/MapBoltWrapper.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.datastream.storm - -import java.util - -import backtype.storm.tuple.Tuple -import org.apache.eagle.datastream.core.StreamInfo - -/** - * @param num if num is zero, then means that it's using type-safe way, because to map operation, it must require at least one output field - * @param fn - * @param streamInfo - */ -case class MapBoltWrapper(num: Int, fn: Any => Any)(implicit streamInfo: StreamInfo) extends AbstractStreamBolt[Any](fieldsNum = num){ - /** - * Handle keyed stream value - */ - override def onKeyValue(key: Any, value: Any)(implicit input:Tuple): Unit = { - emit(fn(value)) - } - - /** - * Handle general stream values list - * - * @param values - */ - override def onValues(values: util.List[AnyRef])(implicit input:Tuple): Unit = { - val size = values.size() - if(size == 0) return - if(num == 0) { - emit(fn(values.get(0))) - } else { - var tuple: AnyRef = null - size match { - case 1 => tuple = scala.Tuple1[AnyRef](values.get(0)) - case 2 => tuple = scala.Tuple2(values.get(0), values.get(1)) - case 3 => tuple = scala.Tuple3(values.get(0), values.get(1), values.get(2)) - case 4 => tuple = scala.Tuple4(values.get(0), values.get(1), values.get(2), values.get(3)) - case _ => throw new IllegalArgumentException(s"Exceed max supported tuple size $size > 4") - } - val output = fn(tuple) - output match { - case scala.Tuple1(a) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef])) - case scala.Tuple2(a, b) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef])) - case scala.Tuple3(a, b, c) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef])) - case scala.Tuple4(a, b, c, d) => emit(util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef])) - case a => emit(util.Arrays.asList(a.asInstanceOf[AnyRef])) - } - } - } -} \ No newline at end of file
