[GEARPUMP-262] Add setup and teardown to user defined functions Author: manuzhang <[email protected]>
Closes #131 from manuzhang/setup_teardown. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a23a40f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a23a40f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a23a40f5 Branch: refs/heads/master Commit: a23a40f5e558a1a5f10503f80204d9c6e690e0bd Parents: 385a612 Author: manuzhang <[email protected]> Authored: Fri Jan 13 17:11:54 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Jan 13 17:12:38 2017 +0800 ---------------------------------------------------------------------- .../examples/kafka/dsl/KafkaReadWrite.scala | 4 +- .../examples/wordcountjava/dsl/WordCount.java | 47 ++- .../wordcount/dsl/WindowedWordCount.scala | 2 +- .../examples/wordcount/dsl/WordCount.scala | 4 +- .../external/hbase/dsl/HBaseDSLSink.scala | 5 +- .../gearpump/streaming/kafka/dsl/KafkaDSL.scala | 12 +- .../javaapi/dsl/functions/FilterFunction.java | 30 -- .../javaapi/dsl/functions/FlatMapFunction.java | 32 --- .../javaapi/dsl/functions/GroupByFunction.java | 31 -- .../javaapi/dsl/functions/MapFunction.java | 31 -- .../javaapi/dsl/functions/ReduceFunction.java | 30 -- .../apache/gearpump/streaming/dsl/Stream.scala | 245 ---------------- .../gearpump/streaming/dsl/StreamApp.scala | 109 ------- .../dsl/api/functions/FilterFunction.scala | 42 +++ .../dsl/api/functions/MapFunction.scala | 43 +++ .../dsl/api/functions/ReduceFunction.scala | 42 +++ .../streaming/dsl/javaapi/JavaStream.scala | 18 +- .../streaming/dsl/javaapi/JavaStreamApp.scala | 5 +- .../dsl/javaapi/functions/FlatMapFunction.scala | 32 +++ .../dsl/javaapi/functions/GroupByFunction.scala | 28 ++ .../apache/gearpump/streaming/dsl/plan/OP.scala | 4 +- .../plan/functions/SingleInputFunction.scala | 66 +++-- .../streaming/dsl/scalaapi/Stream.scala | 287 +++++++++++++++++++ .../streaming/dsl/scalaapi/StreamApp.scala | 109 +++++++ .../scalaapi/functions/FlatMapFunction.scala | 103 +++++++ .../functions/SerializableFunction.scala | 32 +++ .../streaming/dsl/task/TransformTask.scala | 5 +- .../dsl/window/impl/WindowRunner.scala | 16 +- .../gearpump/streaming/dsl/StreamAppSpec.scala | 72 ----- .../gearpump/streaming/dsl/StreamSpec.scala | 128 --------- .../gearpump/streaming/dsl/plan/OpSpec.scala | 13 +- .../streaming/dsl/plan/PlannerSpec.scala | 15 +- .../functions/SingleInputFunctionSpec.scala | 202 ++++++------- .../streaming/dsl/scalaapi/StreamAppSpec.scala | 73 +++++ .../streaming/dsl/scalaapi/StreamSpec.scala | 129 +++++++++ 35 files changed, 1154 insertions(+), 892 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala index 49d3619..cbfe57a 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala @@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.examples.kafka.dsl import java.util.Properties import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser} -import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.kafka.KafkaStoreFactory import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index d4866ed..2942861 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -25,12 +25,17 @@ import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.source.DataSource; import org.apache.gearpump.streaming.task.TaskContext; import scala.Tuple2; import java.time.Instant; import java.util.Arrays; +import java.util.Iterator; /** Java version of WordCount with high level DSL API */ public class WordCount { @@ -46,15 +51,13 @@ public class WordCount { JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), 1, UserConfig.empty(), "source"); - JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(), - "flatMap"); + JavaStream<String> words = sentence.flatMap(new Split(), "flatMap"); - JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map"); + JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map"); - JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy"); + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy"); - JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce( - (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce"); + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce"); wordcount.log(); @@ -88,4 +91,36 @@ public class WordCount { return Instant.now(); } } + + private static class Split extends FlatMapFunction<String, String> { + + @Override + public Iterator<String> apply(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); + } + } + + private static class Ones extends MapFunction<String, Tuple2<String, Integer>> { + + @Override + public Tuple2<String, Integer> apply(String s) { + return new Tuple2<>(s, 1); + } + } + + private static class Count extends ReduceFunction<Tuple2<String, Integer>> { + + @Override + public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<>(t1._1(), t1._2() + t2._2()); + } + } + + private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> { + + @Override + public String apply(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 4f43fd4..401eac0 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -22,7 +22,7 @@ import java.time.{Duration, Instant} import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.TaskContext http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index 22f597c..1cbfb22 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.dsl.StreamApp._ +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._ import org.apache.gearpump.util.AkkaApp /** Same WordCount with High level DSL syntax */ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala index 2417763..22efa89 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -18,13 +18,10 @@ package org.apache.gearpump.external.hbase.dsl import scala.language.implicitConversions - import org.apache.hadoop.conf.Configuration - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.external.hbase.HBaseSink -import org.apache.gearpump.streaming.dsl.Stream -import org.apache.gearpump.streaming.dsl.Stream.Sink +import org.apache.gearpump.streaming.dsl.scalaapi.Stream /** Create a HBase DSL Sink */ class HBaseDSLSink[T](stream: Stream[T]) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala index f1bb26a..996ae0b 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl -import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp} import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory @@ -44,7 +44,7 @@ object KafkaDSL { parallelism: Int = 1, config: UserConfig = UserConfig.empty, description: String = "KafkaSource" - ): dsl.Stream[T] = { + ): Stream[T] = { app.source[T](new KafkaSource(topics, properties), parallelism, config, description) } @@ -66,19 +66,19 @@ object KafkaDSL { properties: Properties, parallelism: Int = 1, config: UserConfig = UserConfig.empty, - description: String = "KafkaSource"): dsl.Stream[T] = { + description: String = "KafkaSource"): Stream[T] = { val source = new KafkaSource(topics, properties) source.setCheckpointStore(checkpointStoreFactory) app.source[T](source, parallelism, config, description) } import scala.language.implicitConversions - implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = { + implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = { new KafkaDSL[T](stream) } } -class KafkaDSL[T](stream: dsl.Stream[T]) { +class KafkaDSL[T](stream: Stream[T]) { /** * Sinks data to Kafka @@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) { properties: Properties, parallelism: Int = 1, userConfig: UserConfig = UserConfig.empty, - description: String = "KafkaSink"): dsl.Stream[T] = { + description: String = "KafkaSink"): Stream[T] = { stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java deleted file mode 100644 index f07ceff..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Filter function - * - * @param <T> Message of type T - */ -public interface FilterFunction<T> extends Serializable { - boolean apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java deleted file mode 100644 index 9788dd2..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; -import java.util.Iterator; - -/** - * Function that converts a value of type T to a iterator of values of type R. - * - * @param <T> Input value type - * @param <R> Return value type - */ -public interface FlatMapFunction<T, R> extends Serializable { - Iterator<R> apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java deleted file mode 100644 index 6c71280..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * GroupBy function which assign value of type T to groups - * - * @param <T> Input value type - * @param <Group> Group Type - */ -public interface GroupByFunction<T, Group> extends Serializable { - Group apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java deleted file mode 100644 index e1fc821..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that map a value of type T to value of type R - * - * @param <T> Input value type - * @param <R> Output value type - */ -public interface MapFunction<T, R> extends Serializable { - R apply(T t); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java deleted file mode 100644 index 2bcac60..0000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that applies reduce operation - * - * @param <T> Input value type - */ -public interface ReduceFunction<T> extends Serializable { - T apply(T t1, T t2); -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala deleted file mode 100644 index 440a45e..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} -import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl._ -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.Graph -import org.slf4j.{Logger, LoggerFactory} - -import scala.language.implicitConversions - -class Stream[T]( - private val graph: Graph[Op, OpEdge], private val thisNode: Op, - private val edge: Option[OpEdge] = None) { - - /** - * converts a value[T] to a list of value[R] - * - * @param fn FlatMap function - * @param description The description message for this operation - * @return A new stream with type [R] - */ - def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { - val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) - graph.addVertex(flatMapOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) - new Stream[R](graph, flatMapOp) - } - - /** - * Maps message of type T message of type R - * - * @param fn Function - * @return A new stream with type [R] - */ - def map[R](fn: T => R, description: String = "map"): Stream[R] = { - this.flatMap({ data => - Option(fn(data)) - }, description) - } - - /** - * Keeps records when fun(T) == true - * - * @param fn the filter - * @return a new stream after filter - */ - def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { - this.flatMap({ data => - if (fn(data)) Option(data) else None - }, description) - } - - /** - * Reduces operations. - * - * @param fn reduction function - * @param description description message for this operator - * @return a new stream after reduction - */ - def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { - val reduceOp = ChainableOp(new ReduceFunction(fn, description)) - graph.addVertex(reduceOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) - new Stream(graph, reduceOp) - } - - /** - * Log to task log file - */ - def log(): Unit = { - this.map(msg => { - LoggerFactory.getLogger("dsl").info(msg.toString) - msg - }, "log") - } - - /** - * Merges data from two stream into one - * - * @param other the other stream - * @return the merged stream - */ - def merge(other: Stream[T], description: String = "merge"): Stream[T] = { - val mergeOp = MergeOp(description, UserConfig.empty) - graph.addVertex(mergeOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) - graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) - new Stream[T](graph, mergeOp) - } - - /** - * Group by function (T => Group) - * - * For example, we have T type, People(name: String, gender: String, age: Int) - * groupBy[People](_.gender) will group the people by gender. - * - * You can append other combinators after groupBy - * - * For example, - * {{{ - * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) - * }}} - * - * @param fn Group by function - * @param parallelism Parallelism level - * @param description The description - * @return the grouped stream - */ - def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, - description: String = "groupBy"): Stream[T] = { - window(CountWindow.apply(1).accumulating) - .groupBy[GROUP](fn, parallelism, description) - } - - /** - * Window function - * - * @param win window definition - * @param description window description - * @return [[WindowStream]] where groupBy could be applied - */ - def window(win: Window, description: String = "window"): WindowStream[T] = { - new WindowStream[T](graph, edge, thisNode, win, description) - } - - /** - * Connects with a low level Processor(TaskDescription) - * - * @param processor a user defined processor - * @param parallelism parallelism level - * @return new stream after processing with type [R] - */ - def process[R]( - processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = "process"): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, description) - graph.addVertex(processorOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) - new Stream[R](graph, processorOp, Some(Shuffle)) - } -} - -class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, - window: Window, winDesc: String) { - - def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, - description: String = "groupBy"): Stream[T] = { - val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) - val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, - s"$winDesc.$description") - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) - } -} - -class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { - /** - * GroupBy key - * - * Applies to Stream[Tuple2[K,V]] - * - * @param parallelism the parallelism for this operation - * @return the new KV stream - */ - def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { - stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") - } - - /** - * Sum the value of the tuples - * - * Apply to Stream[Tuple2[K,V]], V must be of type Number - * - * For input (key, value1), (key, value2), will generate (key, value1 + value2) - * @param numeric the numeric operations - * @return the sum stream - */ - def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { - stream.reduce(Stream.sumByKey[K, V](numeric), "sum") - } -} - -object Stream { - - def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { - new Stream[T](graph, node, edge) - } - - def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 - - def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] - = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) - - implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { - new KVStream(stream) - } - - implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink(dataSink: DataSink, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { - implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) - stream.graph.addVertex(sink) - stream.graph.addEdge(stream.thisNode, Shuffle, sink) - new Stream[T](stream.graph, sink) - } - } -} - -class LoggerSink[T] extends DataSink { - var logger: Logger = _ - - override def open(context: TaskContext): Unit = { - this.logger = context.logger - } - - override def write(message: Message): Unit = { - logger.info("logging message " + message.msg) - } - - override def close(): Unit = Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala deleted file mode 100644 index 8116146..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import java.time.Instant - -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.TaskContext -import org.apache.gearpump.util.Graph -import org.apache.gearpump.Message - -import scala.language.implicitConversions - -/** - * Example: - * {{{ - * val data = "This is a good start, bingo!! bingo!!" - * app.fromCollection(data.lines.toList). - * // word => (word, count) - * flatMap(line => line.split("[\\s]+")).map((_, 1)). - * // (word, count1), (word, count2) => (word, count1 + count2) - * groupBy(kv => kv._1).reduce(sum(_, _)) - * - * val appId = context.submit(app) - * context.close() - * }}} - * - * @param name name of app - */ -class StreamApp( - name: String, system: ActorSystem, userConfig: UserConfig, - private val graph: Graph[Op, OpEdge]) { - - def this(name: String, system: ActorSystem, userConfig: UserConfig) = { - this(name, system, userConfig, Graph.empty[Op, OpEdge]) - } - - def plan(): StreamApplication = { - implicit val actorSystem = system - val planner = new Planner - val dag = planner.plan(graph) - StreamApplication(name, dag, userConfig) - } -} - -object StreamApp { - def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) - : StreamApp = { - new StreamApp(name, context.system, userConfig) - } - - implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { - streamApp.plan() - } - - implicit class Source(app: StreamApp) extends java.io.Serializable { - - def source[T](dataSource: DataSource, parallelism: Int = 1, - conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } - - def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) - } - } -} - -/** A test message source which generated message sequence repeatedly. */ -class CollectionDataSource[T](seq: Seq[T]) extends DataSource { - private lazy val iterator: Iterator[T] = seq.iterator - - override def open(context: TaskContext, startTime: Instant): Unit = {} - - override def read(): Message = { - if (iterator.hasNext) { - Message(iterator.next(), Instant.now().toEpochMilli) - } else { - null - } - } - - override def close(): Unit = {} - - override def getWatermark: Instant = Instant.now() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala new file mode 100644 index 0000000..e4e7309 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object FilterFunction { + + def apply[T](fn: T => Boolean): FilterFunction[T] = { + new FilterFunction[T] { + override def apply(t: T): Boolean = { + fn(t) + } + } + } +} + +/** + * Returns true to keep the input and false otherwise. + * + * @param T Input value type + */ +abstract class FilterFunction[T] extends SerializableFunction { + + def apply(t: T): Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala new file mode 100644 index 0000000..70fe9d4 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object MapFunction { + + def apply[T, R](fn: T => R): MapFunction[T, R] = { + new MapFunction[T, R] { + override def apply(t: T): R = { + fn(t) + } + } + } +} + +/** + * Transforms an input into an output of possibly different types. + * + * @param T Input value type + * @param R Output value type + */ +abstract class MapFunction[T, R] extends SerializableFunction { + + def apply(t: T): R + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala new file mode 100644 index 0000000..25b12be --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object ReduceFunction { + + def apply[T](fn: (T, T) => T): ReduceFunction[T] = { + new ReduceFunction[T] { + override def apply(t1: T, t2: T): T = { + fn(t1, t2) + } + } + } +} + +/** + * Combines two inputs into one output of the same type. + * + * @param T Type of both inputs and output + */ +abstract class ReduceFunction[T] extends SerializableFunction { + + def apply(t1: T, t2: T): T + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index f2654ea..7f3c250 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gearpump.streaming.dsl.javaapi -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} import org.apache.gearpump.streaming.dsl.window.api.Window -import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} -import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task /** @@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task class JavaStream[T](val stream: Stream[T]) { /** FlatMap on stream */ - def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) + def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap")) } /** Map on stream */ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.map({ t: T => fn(t) }, description)) + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description)) } /** Only keep the messages that FilterFunction returns true. */ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) + new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description)) } /** Does aggregation on the stream */ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) + new JavaStream[T](stream.reduce(fn, description)) } def log(): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala index 82a284e..b8d1f4c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -19,13 +19,14 @@ package org.apache.gearpump.streaming.dsl.javaapi import java.util.Collection -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} +import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp} import org.apache.gearpump.streaming.source.DataSource +import scala.collection.JavaConverters._ + class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { private val streamApp = StreamApp(name, context, userConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala new file mode 100644 index 0000000..85d597d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.javaapi.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Java version of FlatMapFunction returns a java.util.Iterator. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): java.util.Iterator[R] +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala new file mode 100644 index 0000000..7656cba --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.javaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction + +/** + * Assigns the input value into a group. + * + * @param T Input value type + * @param GROUP Group value type + */ +abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index f15d875..82ea7c7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -22,7 +22,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GroupByFn @@ -134,7 +134,7 @@ case class ChainableOp[IN, OUT]( other match { case op: ChainableOp[OUT, _] => // TODO: preserve type info - ChainableOp(fn.andThen(op.fn)) + ChainableOp(AndThen(fn, op.fn)) case _ => throw new OpChainException(this, other) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala index 5322648..687fd2e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -17,23 +17,35 @@ */ package org.apache.gearpump.streaming.dsl.plan.functions -trait SingleInputFunction[IN, OUT] extends Serializable { +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction + +/** + * Internal function to process single input + * + * @param IN input value type + * @param OUT output value type + */ +sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - AndThen(this, other) - } + def finish(): TraversableOnce[OUT] = None - def clearState(): Unit = {} + + def teardown(): Unit = {} + def description: String } -case class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) +case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], + second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] { - override def andThen[OUTER]( - other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - first.andThen(second.andThen(other)) + override def setup(): Unit = { + first.setup() + second.setup() } override def process(value: IN): TraversableOnce[OUT] = { @@ -49,9 +61,9 @@ case class AndThen[IN, MIDDLE, OUT]( } } - override def clearState(): Unit = { - first.clearState() - second.clearState() + override def teardown(): Unit = { + first.teardown() + second.teardown() } override def description: String = { @@ -61,22 +73,31 @@ case class AndThen[IN, MIDDLE, OUT]( } } -class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) +class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) extends SingleInputFunction[IN, OUT] { + override def setup(): Unit = { + fn.setup() + } + override def process(value: IN): TraversableOnce[OUT] = { fn(value) } - override def description: String = descriptionMessage + override def teardown(): Unit = { + fn.teardown() + } } - -class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) +class Reducer[T](fn: ReduceFunction[T], val description: String) extends SingleInputFunction[T, T] { private var state: Option[T] = None + override def setup(): Unit = { + fn.setup() + } + override def process(value: T): TraversableOnce[T] = { if (state.isEmpty) { state = Option(value) @@ -90,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) state } - override def clearState(): Unit = { + override def teardown(): Unit = { state = None + fn.teardown() } - - override def description: String = descriptionMessage } -class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { +class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { override def process(value: T): TraversableOnce[Unit] = { emit(value) None } - override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { - throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") - } - override def description: String = "" } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala new file mode 100644 index 0000000..430d795 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions._ +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.slf4j.{Logger, LoggerFactory} + +import scala.language.implicitConversions + +class Stream[T]( + private val graph: Graph[Op, OpEdge], private val thisNode: Op, + private val edge: Option[OpEdge] = None) { + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. + * + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = { + transform(new FlatMapper[T, R](fn, description)) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: T => R, description: String = "map"): Stream[R] = { + this.map(MapFunction(fn), description) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: MapFunction[T, R], description: String): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { + this.filter(FilterFunction(fn), description) + } + + /** + * Returns a new Stream keeping the elements that satisfy the filter function. + * + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: FilterFunction[T], description: String): Stream[T] = { + this.flatMap(FlatMapFunction(fn), description) + } + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + reduce(ReduceFunction(fn), description) + } + + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: ReduceFunction[T], description: String): Stream[T] = { + transform(new Reducer[T](fn, description)) + } + + private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = { + val op = ChainableOp(fn) + graph.addVertex(op) + graph.addEdge(thisNode, edge.getOrElse(Direct), op) + new Stream(graph, op) + } + + /** + * Log to task log file + */ + def log(): Unit = { + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") + } + + /** + * Merges data from two stream into one + * + * @param other the other stream + * @return the merged stream + */ + def merge(other: Stream[T], description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(description, UserConfig.empty) + graph.addVertex(mergeOp) + graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) + graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) + new Stream[T](graph, mergeOp) + } + + /** + * Group by function (T => Group) + * + * For example, we have T type, People(name: String, gender: String, age: Int) + * groupBy[People](_.gender) will group the people by gender. + * + * You can append other combinators after groupBy + * + * For example, + * {{{ + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) + * }}} + * + * @param fn Group by function + * @param parallelism Parallelism level + * @param description The description + * @return the grouped stream + */ + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + window(CountWindow.apply(1).accumulating) + .groupBy[GROUP](fn, parallelism, description) + } + + /** + * Window function + * + * @param win window definition + * @param description window description + * @return [[WindowStream]] where groupBy could be applied + */ + def window(win: Window, description: String = "window"): WindowStream[T] = { + new WindowStream[T](graph, edge, thisNode, win, description) + } + + /** + * Connects with a low level Processor(TaskDescription) + * + * @param processor a user defined processor + * @param parallelism parallelism level + * @return new stream after processing with type [R] + */ + def process[R]( + processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) + graph.addVertex(processorOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) + new Stream[R](graph, processorOp, Some(Shuffle)) + } +} + +class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, + window: Window, winDesc: String) { + + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + s"$winDesc.$description") + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } +} + +class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { + /** + * GroupBy key + * + * Applies to Stream[Tuple2[K,V]] + * + * @param parallelism the parallelism for this operation + * @return the new KV stream + */ + def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = { + stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey") + } + + /** + * Sum the value of the tuples + * + * Apply to Stream[Tuple2[K,V]], V must be of type Number + * + * For input (key, value1), (key, value2), will generate (key, value1 + value2) + * @param numeric the numeric operations + * @return the sum stream + */ + def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = { + stream.reduce(Stream.sumByKey[K, V](numeric), "sum") + } +} + +object Stream { + + def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = { + new Stream[T](graph, node, edge) + } + + def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 + + def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] + = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) + + implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = { + new KVStream(stream) + } + + implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) + stream.graph.addVertex(sink) + stream.graph.addEdge(stream.thisNode, Shuffle, sink) + new Stream[T](stream.graph, sink) + } + } +} + +class LoggerSink[T] extends DataSink { + var logger: Logger = _ + + override def open(context: TaskContext): Unit = { + this.logger = context.logger + } + + override def write(message: Message): Unit = { + logger.info("logging message " + message.msg) + } + + override def close(): Unit = Unit +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala new file mode 100644 index 0000000..d6eed2e --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.Graph + +import scala.language.implicitConversions + +/** + * Example: + * {{{ + * val data = "This is a good start, bingo!! bingo!!" + * app.fromCollection(data.lines.toList). + * // word => (word, count) + * flatMap(line => line.split("[\\s]+")).map((_, 1)). + * // (word, count1), (word, count2) => (word, count1 + count2) + * groupBy(kv => kv._1).reduce(sum(_, _)) + * + * val appId = context.submit(app) + * context.close() + * }}} + * + * @param name name of app + */ +class StreamApp( + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { + + def this(name: String, system: ActorSystem, userConfig: UserConfig) = { + this(name, system, userConfig, Graph.empty[Op, OpEdge]) + } + + def plan(): StreamApplication = { + implicit val actorSystem = system + val planner = new Planner + val dag = planner.plan(graph) + StreamApplication(name, dag, userConfig) + } +} + +object StreamApp { + def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) + : StreamApp = { + new StreamApp(name, context.system, userConfig) + } + + implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { + streamApp.plan() + } + + implicit class Source(app: StreamApp) extends java.io.Serializable { + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { + implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) + app.graph.addVertex(sourceOp) + new Stream[T](app.graph, sourceOp) + } + + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) + } + } +} + +/** A test message source which generated message sequence repeatedly. */ +class CollectionDataSource[T](seq: Seq[T]) extends DataSource { + private lazy val iterator: Iterator[T] = seq.iterator + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = { + if (iterator.hasNext) { + Message(iterator.next(), Instant.now().toEpochMilli) + } else { + null + } + } + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala new file mode 100644 index 0000000..f10a3db --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.scalaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction} + +import scala.collection.JavaConverters._ + +object FlatMapFunction { + + def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + fn.apply(t).asScala + } + + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + override def apply(t: T): TraversableOnce[R] = { + fn(t) + } + } + } + + def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + Option(fn(t)) + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = { + new FlatMapFunction[T, T] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[T] = { + if (fn(t)) { + Option(t) + } else { + None + } + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } +} + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Scala version of FlatMapFunction returns a TraversableOnce. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): TraversableOnce[R] + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala new file mode 100644 index 0000000..ab88bf1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.scalaapi.functions + +/** + * Superclass for all user defined function interfaces. + * This ensures all functions are serializable and provides common methods + * like setup and teardown. Users should not extend this class directly + * but subclasses like [[FlatMapFunction]]. + */ +abstract class SerializableFunction extends java.io.Serializable { + + def setup(): Unit = {} + + def teardown(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index e35f085..c13a4fb 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -23,9 +23,8 @@ import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} -class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { +class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], + taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { this(userConf.getValue[SingleInputFunction[IN, OUT]]( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index d87a9e4..223a4af 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -28,7 +28,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap import com.gs.collections.impl.set.mutable.UnifiedSet import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction} import org.apache.gearpump.streaming.dsl.window.api.Discarding import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil @@ -39,7 +39,6 @@ trait WindowRunner { def process(message: Message): Unit def trigger(time: Instant): Unit - } object DefaultWindowRunner { @@ -59,7 +58,6 @@ class DefaultWindowRunner[IN, GROUP, OUT]( private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]] private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] - override def process(message: Message): Unit = { val (group, buckets) = groupBy.groupBy(message) buckets.foreach { bucket => @@ -72,8 +70,11 @@ class DefaultWindowRunner[IN, GROUP, OUT]( inputs.add(message.msg.asInstanceOf[IN]) windowGroups.put(wg, inputs) } - groupFns.putIfAbsent(group, - userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + if (!groupFns.containsKey(group)) { + val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + fn.setup() + groupFns.put(group, fn) + } } override def trigger(time: Instant): Unit = { @@ -88,8 +89,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( wgs.forEach(new Procedure[WindowGroup[GROUP]] { override def value(each: WindowGroup[GROUP]): Unit = { val inputs = windowGroups.remove(each) - val reduceFn = groupFns.get(each.group) - .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time))) inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { // .toList forces eager evaluation @@ -99,7 +99,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( // .toList forces eager evaluation reduceFn.finish().toList if (groupBy.window.accumulationMode == Discarding) { - reduceFn.clearState() + reduceFn.teardown() } } }) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala deleted file mode 100644 index db4db93..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import akka.actor.ActorSystem -import org.apache.gearpump.cluster.TestUtil -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.partitioner.PartitionerDescription -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.util.Graph -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = _ - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "be able to generate multiple new streams" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val dsl = StreamApp("dsl", context) - dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] - dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] - - val application = dsl.plan() - application shouldBe a [StreamApplication] - application.name shouldBe "dsl" - val dag = application.userConfig - .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - dag.vertices.size shouldBe 2 - dag.vertices.foreach { processor => - processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName - if (processor.description == "A") { - processor.parallelism shouldBe 2 - } else if (processor.description == "B") { - processor.parallelism shouldBe 3 - } else { - fail(s"undefined source ${processor.description}") - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala deleted file mode 100644 index 8def61e..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl - -import akka.actor._ -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} -import org.apache.gearpump.streaming.dsl.StreamSpec.Join -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.Graph -import org.apache.gearpump.util.Graph._ -import org.mockito.Mockito.when -import org.scalatest._ -import org.scalatest.mock.MockitoSugar - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.util.{Either, Left, Right} - -class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - - implicit var system: ActorSystem = _ - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "translate the DSL to a DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val dsl = StreamApp("dsl", context) - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val stream = dsl.source(data.lines.toList, 1, ""). - flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). - map(word => (word, 1)). - groupBy(_._1, parallelism = 2). - reduce((left, right) => (left._1, left._2 + right._2)). - map[Either[(String, Int), String]](Left(_)) - - val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) - stream.merge(query).process[(String, Int)](classOf[Join], 1) - - val app: StreamApplication = dsl.plan() - val dag = app.userConfig - .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - - val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => - edge.partitionerFactory.partitioner.getClass.getName - } - val expectedDagTopology = getExpectedDagTopology - - dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet - dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet - } - - private def getExpectedDagTopology: Graph[String, String] = { - val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[CountTriggerTask[_, _]].getName - val merge = classOf[TransformTask[_, _]].getName - val join = classOf[Join].getName - - val hash = classOf[HashPartitioner].getName - val groupBy = classOf[GroupByPartitioner[_, _]].getName - val colocation = classOf[CoLocationPartitioner].getName - - val expectedDagTopology = Graph( - source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, - source ~ hash ~> merge - ) - expectedDagTopology - } -} - -object StreamSpec { - - class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - var query: String = _ - - override def onNext(msg: Message): Unit = { - msg.msg match { - case Left(wordCount: (String @unchecked, Int @unchecked)) => - if (query != null && wordCount._1 == query) { - taskContext.output(new Message(wordCount)) - } - - case Right(query: String) => - this.query = query - } - } - } -}
