[GEARPUMP-311] refactor state management
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fe410304 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fe410304 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fe410304 Branch: refs/heads/state Commit: fe4103046ae3bddacce2cd7a17270a994a494dbd Parents: 50316ce Author: vinoyang <[email protected]> Authored: Fri Jul 21 20:32:10 2017 +0800 Committer: vinoyang <[email protected]> Committed: Sat Jul 22 17:16:56 2017 +0800 ---------------------------------------------------------------------- .../state/refactor/ProduceProcessor.scala | 53 +++++ .../examples/state/refactor/SumProcessor.scala | 92 ++++++++ .../wordcountjava/dsl/refactor/WordCount.java | 211 +++++++++++++++++++ .../wordcount/dsl/refactor/WordCount.scala | 151 +++++++++++++ project/BuildExamples.scala | 36 +++- .../streaming/refactor/javaapi/Processor.java | 143 +++++++++++++ .../api/functions/MapWithStateFunction.scala | 55 +++++ .../refactor/dsl/javaapi/JavaStreamApp.scala | 49 +++++ .../functions/FlatMapWithStateFunction.scala | 48 +++++ .../streaming/refactor/dsl/plan/Planner.scala | 90 ++++++++ .../dsl/plan/functions/FunctionRunner.scala | 135 ++++++++++++ .../refactor/dsl/scalaapi/StreamApp.scala | 116 ++++++++++ .../functions/FlatMapWithStateFunction.scala | 120 +++++++++++ .../refactor/dsl/task/GroupByTask.scala | 89 ++++++++ .../streaming/refactor/dsl/task/TaskUtil.scala | 65 ++++++ .../refactor/dsl/task/TransformTask.scala | 55 +++++ .../dsl/window/impl/ReduceFnRunner.scala | 29 +++ .../refactor/dsl/window/impl/WindowRunner.scala | 160 ++++++++++++++ .../refactor/sink/DataSinkProcessor.scala | 36 ++++ .../streaming/refactor/sink/DataSinkTask.scala | 52 +++++ .../refactor/source/DataSourceProcessor.scala | 55 +++++ .../refactor/source/DataSourceTask.scala | 91 ++++++++ .../refactor/state/RuntimeContext.scala | 35 +++ .../streaming/refactor/state/StatefulTask.scala | 174 +++++++++++++++ 24 files changed, 2136 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala new file mode 100644 index 0000000..11f99e2 --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.refactor + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StatefulTask} +import org.apache.gearpump.streaming.task.TaskContext + +/** + * a produce processor for generating a specific num sequence + */ +class ProduceProcessor(taskContext: TaskContext, conf: UserConfig) + extends StatefulTask(taskContext, conf) { + + override def open(runtimeContext: RuntimeContext): Unit = {} + + override def invoke(message: Message): Unit = { + message.value match { + case msgBytes: Array[Byte] => { + val msgStr = new String(msgBytes) + LOG.info("got total sequence num : {}", msgStr) + + val n: Int = Integer.valueOf(msgStr) + var sumResult: Long = 0 + for (i <- 1 to n) { + taskContext.output(Message(String.valueOf(i).getBytes)) + sumResult = sumResult + i + } + + LOG.info(" total sum result : {}", sumResult) + } + } + } + + override def close(runtimeContext: RuntimeContext): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala new file mode 100644 index 0000000..438b337 --- /dev/null +++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.state.refactor + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, ValueState} +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StateNamespaces, StateTags, StatefulTask} +import org.apache.gearpump.streaming.task.TaskContext + +/** + * a sum processor for continues sum message from kafka + * it is a example for using state api and verifying state exactly-once guarantee + */ +class SumProcessor(taskContext: TaskContext, conf: UserConfig) + extends StatefulTask(taskContext, conf) { + + private implicit val valueStateTag = "tag1" + private implicit val counterStateTag = "tag2" + + private var stateInternals: Option[StateInternals] = None + private var valueState: Option[ValueState[java.lang.Long]] = None + private var counterState: Option[ValueState[java.lang.Long]] = None + + override def open(stateContext: RuntimeContext): Unit = { + stateInternals = Some(stateContext.getStateInternals(StringUtf8Coder.of, "partitionedKey")) + valueState = Some( + stateInternals.get.state( + StateNamespaces.global, StateTags.value(valueStateTag, VarLongCoder.of)) + ) + + counterState = Some( + stateInternals.get.state( + StateNamespaces.global, StateTags.value(counterStateTag, VarLongCoder.of)) + ) + + // init + if (valueState.get.read == null) { + LOG.info("[open] value state current is null, init it to 0") + valueState.get.write(0L) + } else { + LOG.info("[open] load from snapshot value state current is : {}", valueState.get.read) + } + + if (counterState.get.read == null) { + LOG.info("[open] counter state current is null, init it to 0") + counterState.get.write(0L) + } else { + LOG.info("[open] load from snapshot counter state current is : {}", counterState.get.read) + } + } + + override def invoke(message: Message): Unit = { + message.value match { + case numberByte: Array[Byte] => { + val number = new String(numberByte) + val oldVal = valueState.get.read + valueState.get.write(oldVal + java.lang.Long.valueOf(number)) + + val oldCounter = counterState.get.read + counterState.get.write(oldCounter + 1) + } + + case other => LOG.error("received unsupported message {}", other) + } + + if (counterState.get.read % 1000000 == 0) { + LOG.info("counter state is : {}", counterState.get.read) + LOG.info("value state is : {}", valueState.get.read) + } + } + + override def close(stateContext: RuntimeContext): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java new file mode 100644 index 0000000..900c35a --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcountjava.dsl.refactor; + +import com.typesafe.config.Config; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.gearpump.DefaultMessage; +import org.apache.gearpump.Message; +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; +import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory; +import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation; +import org.apache.gearpump.streaming.refactor.coder.StringUtf8Coder; +import org.apache.gearpump.streaming.refactor.coder.VarLongCoder; +import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction; +import org.apache.gearpump.streaming.refactor.dsl.javaapi.JavaStream; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; +import org.apache.gearpump.streaming.refactor.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.FlatMapWithStateFunction; +import org.apache.gearpump.streaming.refactor.state.RuntimeContext; +import org.apache.gearpump.streaming.refactor.state.StateNamespaces; +import org.apache.gearpump.streaming.refactor.state.StateTags; +import org.apache.gearpump.streaming.refactor.state.api.StateInternals; +import org.apache.gearpump.streaming.refactor.state.api.ValueState; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.state.impl.PersistentStateConfig; +import org.apache.gearpump.streaming.task.TaskContext; +import org.apache.hadoop.conf.Configuration; +import scala.Tuple2; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Iterator; + +/** Java version of WordCount with high level DSL API */ +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + ClientContext context = new ClientContext(akkaConf); + + Configuration hadoopConfig = new Configuration(); + HadoopCheckpointStoreFactory checkpointStoreFactory = new HadoopCheckpointStoreFactory( + "MessageConsume", hadoopConfig, + // Rotates on 1MB + new FileSizeRotation(1000000) + ); + UserConfig taskConfig = UserConfig.empty() + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE(), true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS(), 1000L) + .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY(), + checkpointStoreFactory, + context.system() + ); + + JavaStreamApp app = new JavaStreamApp("JavaDSL", context, taskConfig); + + JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), + 1, UserConfig.empty(), "source"); + + JavaStream<String> words = sentence.flatMapWithState(new StatefulSplitFunction(), "flatMap"); + + JavaStream<Tuple2<String, Integer>> ones = words.mapWithState(new StatefulMapFunction(), "map"); + + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy"); + + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce"); + + wordcount.log(); + + app.submit().waitUntilFinish(); + context.close(); + } + + private static class StringSource implements DataSource { + + private final String str; + + StringSource(String str) { + this.str = str; + } + + @Override + public void open(TaskContext context, Instant startTime) { + } + + @Override + public Message read() { + return new DefaultMessage(str, Instant.now()); + } + + @Override + public void close() { + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + } + + private static class StatefulSplitFunction extends FlatMapWithStateFunction<String, String> { + + private static final Log logger = LogFactory.getLog(StatefulSplitFunction.class); + + private String counterStateTag = "tag1"; + + private StateInternals stateInternal; + private ValueState<Long> counterState; + + @Override + public void setup(RuntimeContext runtimeContext) { + logger.info("StatefulSplitFunction setup."); + stateInternal = runtimeContext.getStateInternals(StringUtf8Coder.of(), "partitionedKey"); + + counterState = stateInternal.state(StateNamespaces.global(), StateTags.value(counterStateTag, VarLongCoder.of())); + + if (counterState.read() == null) { + counterState.write(0L); + } + } + + @Override + public Iterator<String> flatMap(String s) { + long oldVal = counterState.read(); + logger.info("old value in flatMap : " + oldVal); + counterState.write(oldVal + 1); + + return Arrays.asList(s.split("\\s+")).iterator(); + } + + @Override + public void teardown(RuntimeContext runtimeContext) { + logger.info("StatefulSplitFunction teardown."); + } + } + + private static class StatefulMapFunction extends MapWithStateFunction<String, Tuple2<String, Integer>> { + + private static final Log logger = LogFactory.getLog(StatefulMapFunction.class); + + private String counterStateTag = "tag2"; + + private StateInternals stateInternal; + private ValueState<Long> counterState; + + @Override + public void setup(RuntimeContext runtimeContext) { + logger.info("StatefulMapFunction setup."); + stateInternal = runtimeContext.getStateInternals(StringUtf8Coder.of(), "partitionedKey"); + + counterState = stateInternal.state(StateNamespaces.global(), StateTags.value(counterStateTag, VarLongCoder.of())); + + if (counterState.read() == null) { + counterState.write(0L); + } + } + + @Override + public Tuple2<String, Integer> map(String s) { + long oldVal = counterState.read(); + logger.info("old value in map method : " + oldVal); + counterState.write(oldVal + 1); + + return new Tuple2<>(s, 1); + } + + @Override + public void teardown(RuntimeContext runtimeContext) { + logger.info("StatefulMapFunction teardown."); + } + } + + private static class Count extends ReduceFunction<Tuple2<String, Integer>> { + + @Override + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<>(t1._1(), t1._2() + t2._2()); + } + } + + private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> { + + @Override + public String groupBy(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala new file mode 100644 index 0000000..a9919e2 --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor + +import org.apache.commons.logging.{Log, LogFactory} +import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, VarLongCoder} +import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction +import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, ValueState} +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StateNamespaces, StateTags} +import org.apache.hadoop.conf.Configuration +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory +import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation +import org.apache.gearpump.streaming.state.impl.PersistentStateConfig +import org.apache.gearpump.streaming.{Processor, StreamApplication} +import org.apache.gearpump.util.{AkkaApp, Graph} + +/** + * + */ +object WordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + + val hadoopConfig = new Configuration + val checkpointStoreFactory = new HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig, + // Rotates on 1MB + new FileSizeRotation(1000000)) + val taskConfig = UserConfig.empty + .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true) + .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L) + .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, + checkpointStoreFactory)(context.system) + + val app = StreamApp("dsl", context, taskConfig) + val data = "This is a good start, bingo!! bingo!!" + app.source(data.lines.toList, 1, "source"). + // word => (word, count) + flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap udf"). + mapWithState(new StatefulMapFunction(), ""). + // (word, count1), (word, count2) => (word, count1 + count2) + groupByKey().sum.log + + context.submit(app).waitUntilFinish() + context.close() + } + + + private class StatefulFlatMapFunction + extends FlatMapWithStateFunction[String, String] { + + private val logger: Log = LogFactory.getLog(getClass) + + private implicit val counterStateTag = "tag1" + + private var stateInternals: Option[StateInternals] = None + private var counterState: Option[ValueState[java.lang.Long]] = None + + override def setup(runtimeContext: RuntimeContext): Unit = { + logger.info("StatefulFlatMapFunction setup.") + stateInternals = Some(runtimeContext.getStateInternals(StringUtf8Coder.of, "partitionedKey")) + + counterState = Some( + stateInternals.get.state( + StateNamespaces.global, StateTags.value(counterStateTag, VarLongCoder.of)) + ) + + // init + if (counterState.get.read == null) { + counterState.get.write(0L) + } + } + + + override def flatMap(t: String): TraversableOnce[String] = { + val oldVal = counterState.get.read + logger.info("old value in flatmap : " + oldVal) + counterState.get.write(oldVal + 1) + + t.split("[\\s]+") + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + logger.info("StatefulFlatMapFunction teardown.") + } + + } + + private class StatefulMapFunction + extends MapWithStateFunction[String, (String, Int)] { + + private val logger: Log = LogFactory.getLog(getClass) + + private implicit val counterStateTag = "tag2" + + private var stateInternals: Option[StateInternals] = None + private var counterState: Option[ValueState[java.lang.Long]] = None + + override def setup(runtimeContext: RuntimeContext): Unit = { + logger.info("StatefulMapFunction setup.") + stateInternals = Some(runtimeContext.getStateInternals(StringUtf8Coder.of, "partitionedKey")) + + counterState = Some( + stateInternals.get.state( + StateNamespaces.global, StateTags.value(counterStateTag, VarLongCoder.of)) + ) + + // init + if (counterState.get.read == null) { + counterState.get.write(0L) + } + } + + override def map(t: String): (String, Int) = { + val oldVal = counterState.get.read + logger.info("old value in map : " + oldVal) + counterState.get.write(oldVal + 1) + + (t, 1) + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + logger.info("StatefulMapFunction teardown.") + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/project/BuildExamples.scala ---------------------------------------------------------------------- diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala index d7390b5..57210c0 100644 --- a/project/BuildExamples.scala +++ b/project/BuildExamples.scala @@ -62,14 +62,42 @@ object BuildExamples extends sbt.Build { lazy val wordcountJava = Project( id = "gearpump-examples-wordcountjava", base = file("examples/streaming/wordcount-java"), - settings = exampleSettings("org.apache.gearpump.streaming.examples.wordcountjava.WordCount") - ).dependsOn(core % "provided", streaming % "test->test; provided") + settings = exampleSettings("org.apache.gearpump.streaming.examples.wordcountjava.WordCount") ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-common" % hadoopVersion + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.mortbay.jetty", "jetty") + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("tomcat", "jasper-runtime") + exclude("commons-beanutils", "commons-beanutils-core") + exclude("commons-beanutils", "commons-beanutils") + exclude("asm", "asm") + exclude("org.ow2.asm", "asm"), + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion + ) + ) + ).dependsOn(core % "provided", streaming % "test->test; provided", external_hadoopfs) lazy val wordcount = Project( id = "gearpump-examples-wordcount", base = file("examples/streaming/wordcount"), - settings = exampleSettings("org.apache.gearpump.streaming.examples.wordcount.WordCount") - ).dependsOn(core % "provided", streaming % "test->test; provided") + settings = exampleSettings("org.apache.gearpump.streaming.examples.wordcount.WordCount") ++ + Seq( + libraryDependencies ++= Seq( + "org.apache.hadoop" % "hadoop-common" % hadoopVersion + exclude("org.mortbay.jetty", "jetty-util") + exclude("org.mortbay.jetty", "jetty") + exclude("org.fusesource.leveldbjni", "leveldbjni-all") + exclude("tomcat", "jasper-runtime") + exclude("commons-beanutils", "commons-beanutils-core") + exclude("commons-beanutils", "commons-beanutils") + exclude("asm", "asm") + exclude("org.ow2.asm", "asm"), + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion + ) + ) + ).dependsOn(core % "provided", streaming % "test->test; provided", external_hadoopfs) lazy val sol = Project( id = "gearpump-examples-sol", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java new file mode 100644 index 0000000..7d85b09 --- /dev/null +++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.javaapi; + +import akka.actor.ActorSystem; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.streaming.javaapi.Task; +import org.apache.gearpump.streaming.sink.DataSink; +import org.apache.gearpump.streaming.refactor.sink.DataSinkProcessor; +import org.apache.gearpump.streaming.refactor.sink.DataSinkTask; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.refactor.source.DataSourceProcessor; +import org.apache.gearpump.streaming.refactor.source.DataSourceTask; + +/** + * Java version of Processor + * + * See {@link org.apache.gearpump.streaming.Processor} + */ +public class Processor<T extends org.apache.gearpump.streaming.task.Task> implements org.apache.gearpump.streaming.Processor<T> { + private Class<T> _taskClass; + private int _parallelism = 1; + private String _description = ""; + private UserConfig _userConf = UserConfig.empty(); + + public Processor(Class<T> taskClass) { + this._taskClass = taskClass; + } + + public Processor(Class<T> taskClass, int parallelism) { + this._taskClass = taskClass; + this._parallelism = parallelism; + } + + /** + * Creates a Sink Processor + * + * @param dataSink the data sink itself + * @param parallelism the parallelism of this processor + * @param description the description for this processor + * @param taskConf the configuration for this processor + * @param system actor system + * @return the new created sink processor + */ + public static Processor<DataSinkTask> sink(DataSink dataSink, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + org.apache.gearpump.streaming.Processor<DataSinkTask> p = DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system); + return new Processor(p); + } + + /** + * Creates a Source Processor + * + * @param source the data source itself + * @param parallelism the parallelism of this processor + * @param description the description of this processor + * @param taskConf the configuration of this processor + * @param system actor system + * @return the new created source processor + */ + public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { + org.apache.gearpump.streaming.Processor<DataSourceTask<Object, Object>> p = + DataSourceProcessor.apply(source, parallelism, description, taskConf, system); + return new Processor(p); + } + + public Processor(org.apache.gearpump.streaming.Processor<T> processor) { + this._taskClass = (Class) (processor.taskClass()); + this._parallelism = processor.parallelism(); + this._description = processor.description(); + this._userConf = processor.taskConf(); + } + + /** + * Creates a general processor with user specified task logic. + * + * @param taskClass task implementation class of this processor (shall be a derived class from {@link Task} + * @param parallelism, how many initial tasks you want to use + * @param description, some text to describe this processor + * @param taskConf, Processor specific configuration + */ + public Processor(Class<T> taskClass, int parallelism, String description, UserConfig taskConf) { + this._taskClass = taskClass; + this._parallelism = parallelism; + this._description = description; + this._userConf = taskConf; + } + + public Processor<T> withParallelism(int parallel) { + return new Processor<T>(_taskClass, parallel, _description, _userConf); + } + + public Processor<T> withDescription(String desc) { + return new Processor<T>(_taskClass, _parallelism, desc, _userConf); + } + + public Processor<T> withConfig(UserConfig conf) { + return new Processor<T>(_taskClass, _parallelism, _description, conf); + } + + @Override + public int parallelism() { + return _parallelism; + } + + @Override + public UserConfig taskConf() { + return _userConf; + } + + @Override + public String description() { + return _description; + } + + @Override + public Class<? extends org.apache.gearpump.streaming.task.Task> taskClass() { + return _taskClass; + } + + /** + * reference equal + */ + @Override + public boolean equals(Object obj) { + return (this == obj); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala new file mode 100644 index 0000000..6724ab5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction +import org.apache.gearpump.streaming.refactor.state.RuntimeContext + +object MapWithStateFunction { + + def apply[T, R](fn: T => R): MapWithStateFunction[T, R] = { + new MapWithStateFunction[T, R] { + override def map(t: T): R = { + fn(t) + } + } + } + +} + +/** + * map function support state + */ +abstract class MapWithStateFunction[T, R] extends MapFunction[T, R] { + + final override def setup(): Unit = { + throw new UnsupportedOperationException("please call or override " + + "setup(runtimeContext: RuntimeContext) .") + } + + final override def teardown(): Unit = { + throw new UnsupportedOperationException("please call or override " + + "teardown(runtimeContext: RuntimeContext) ") + } + + def setup(runtimeContext: RuntimeContext): Unit = {} + + def teardown(runtimeContext: RuntimeContext): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala new file mode 100644 index 0000000..5a88c17 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.javaapi + +import java.util.Collection + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication} +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.source.DataSource + +import scala.collection.JavaConverters._ + +class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { + + private val streamApp = StreamApp(name, context, userConfig) + + def source[T](collection: Collection[T], parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + val dataSource = new CollectionDataSource(collection.asScala.toSeq) + source(dataSource, parallelism, conf, description) + } + + def source[T](dataSource: DataSource, parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) + } + + def submit(): RunningApplication = { + context.submit(streamApp) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala new file mode 100644 index 0000000..4ed48ab --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.javaapi.functions + +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.refactor.state.RuntimeContext + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Java version of FlatMapFunction returns a java.util.Iterator. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapWithStateFunction[T, R] extends FlatMapFunction[T, R] { + + def flatMap(t: T): java.util.Iterator[R] + + final override def setup(): Unit = { + throw new UnsupportedOperationException("please call or override " + + "setup(runtimeContext: RuntimeContext) ") + } + + final override def teardown(): Unit = { + throw new UnsupportedOperationException("please call or override " + + "teardown(runtimeContext: RuntimeContext) ") + } + + def setup(runtimeContext: RuntimeContext): Unit = {} + + def teardown(runtimeContext: RuntimeContext): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala new file mode 100644 index 0000000..03cdf43 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.plan + +import akka.actor.ActorSystem +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.refactor.dsl.plan.functions._ +import org.apache.gearpump.streaming.task.Task +import org.apache.gearpump.util.Graph + +class Planner { + + /** + * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low + * level Graph API. + */ + def plan(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { + + val graph = optimize(dag) + graph.mapEdge { (_, edge, node2) => + edge match { + case Shuffle => + node2 match { + case op: GroupByOp[_, _] => + new GroupByPartitioner(op.groupBy) + case _ => new HashPartitioner + } + case Direct => + new CoLocationPartitioner + } + }.mapVertex(_.toProcessor) + } + + private def optimize(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Op, OpEdge] = { + val graph = dag.copy + val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse + for (node <- nodes) { + val outGoingEdges = graph.outgoingEdgesOf(node) + for (edge <- outGoingEdges) { + merge(graph, edge._1, edge._3) + } + } + graph + } + + private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op) + (implicit system: ActorSystem): Unit = { + if (graph.outDegreeOf(node1) == 1 && + graph.inDegreeOf(node2) == 1 && + // For processor node, we don't allow it to merge with downstream operators + !node1.isInstanceOf[ProcessorOp[_ <: Task]] && + !node2.isInstanceOf[ProcessorOp[_ <: Task]]) { + val (_, edge, _) = graph.outgoingEdgesOf(node1).head + if (edge == Direct) { + val chainedOp = node1.chain(node2) + graph.addVertex(chainedOp) + for (incomingEdge <- graph.incomingEdgesOf(node1)) { + graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp) + } + + for (outgoingEdge <- graph.outgoingEdgesOf(node2)) { + graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3) + } + + // Remove the old vertex + graph.removeVertex(node1) + graph.removeVertex(node2) + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala new file mode 100644 index 0000000..27f4265 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.plan.functions + +import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction +import org.apache.gearpump.streaming.refactor.state.RuntimeContext + +/** + * Interface to invoke SerializableFunction methods + * + * @param IN input value type + * @param OUT output value type + */ +sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable { + + def setup(runtimeContext: RuntimeContext): Unit = {} + + def process(value: IN): TraversableOnce[OUT] + + def finish(): TraversableOnce[OUT] = None + + def teardown(runtimeContext: RuntimeContext): Unit = {} + + def description: String +} + +case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE], + second: FunctionRunner[MIDDLE, OUT]) + extends FunctionRunner[IN, OUT] { + + override def setup(runtimeContext: RuntimeContext): Unit = { + first.setup(runtimeContext) + second.setup(runtimeContext) + } + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process) + } + + override def finish(): TraversableOnce[OUT] = { + val firstResult = first.finish().flatMap(second.process) + if (firstResult.isEmpty) { + second.finish() + } else { + firstResult + } + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + first.teardown(runtimeContext) + second.teardown(runtimeContext) + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.orNull + } +} + +class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) + extends FunctionRunner[IN, OUT] { + + override def setup(runtimeContext: RuntimeContext): Unit = { + if (fn.isInstanceOf[FlatMapWithStateFunction[IN, OUT]]) { + fn.asInstanceOf[FlatMapWithStateFunction[IN, OUT]].setup(runtimeContext) + } else { + fn.setup() + } + } + + override def process(value: IN): TraversableOnce[OUT] = { + fn.flatMap(value) + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + if (fn.isInstanceOf[FlatMapWithStateFunction[IN, OUT]]) { + fn.asInstanceOf[FlatMapWithStateFunction[IN, OUT]].teardown(runtimeContext) + } else { + fn.teardown() + } + } +} + +class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String) + extends FunctionRunner[T, A] { + + private var state: Option[A] = None + + override def setup(runtimeContext: RuntimeContext): Unit = { + // TODO + fn.setup() + state = Option(fn.init) + } + + override def process(value: T): TraversableOnce[A] = { + state = state.map(fn.fold(_, value)) + None + } + + override def finish(): TraversableOnce[A] = { + state + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + // TODO + state = None + fn.teardown() + } +} + +class DummyRunner[T] extends FunctionRunner[T, T] { + + override def process(value: T): TraversableOnce[T] = Option(value) + + override def description: String = "" +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala new file mode 100644 index 0000000..a8dd727 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.scalaapi + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.StreamApplication +import org.apache.gearpump.streaming.refactor.dsl.plan._ +import org.apache.gearpump.streaming.refactor.dsl.plan.functions.{DataSourceOp, Op, OpEdge} +import org.apache.gearpump.streaming.source.{DataSource, Watermark} +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.Graph + +import scala.language.implicitConversions + +/** + * Example: + * {{{ + * val data = "This is a good start, bingo!! bingo!!" + * app.fromCollection(data.lines.toList). + * // word => (word, count) + * flatMap(line => line.split("[\\s]+")).map((_, 1)). + * // (word, count1), (word, count2) => (word, count1 + count2) + * groupBy(kv => kv._1).reduce(sum(_, _)) + * + * val appId = context.submit(app) + * context.close() + * }}} + * + * @param name name of app + */ +class StreamApp( + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { + + def this(name: String, system: ActorSystem, userConfig: UserConfig) = { + this(name, system, userConfig, Graph.empty[Op, OpEdge]) + } + + def plan(): StreamApplication = { + implicit val actorSystem = system + val planner = new Planner + val dag = planner.plan(graph) + StreamApplication(name, dag, userConfig) + } +} + +object StreamApp { + def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty) + : StreamApp = { + new StreamApp(name, context.system, userConfig) + } + + implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { + streamApp.plan() + } + + implicit class Source(app: StreamApp) extends java.io.Serializable { + + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { + implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf) + app.graph.addVertex(sourceOp) + new Stream[T](app.graph, sourceOp) + } + + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) + } + } +} + +/** A test message source which generated message sequence repeatedly. */ +class CollectionDataSource[T](seq: Seq[T]) extends DataSource { + private lazy val iterator: Iterator[T] = seq.iterator + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = { + if (iterator.hasNext) { + Message(iterator.next(), Instant.now()) + } else { + null + } + } + + override def close(): Unit = {} + + override def getWatermark: Instant = { + if (iterator.hasNext) { + Instant.now() + } else { + Watermark.MAX + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala new file mode 100644 index 0000000..cb878d6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.{FlatMapWithStateFunction => JFlatMapWithStateFunction} +import org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction +import org.apache.gearpump.streaming.refactor.state.RuntimeContext + +import scala.collection.JavaConverters._ + +object FlatMapWithStateFunction { + + def apply[T, R](fn: JFlatMapWithStateFunction[T, R]): FlatMapWithStateFunction[T, R] = { + new FlatMapWithStateFunction[T, R] { + + override def setup(runtimeContext: RuntimeContext): Unit = { + fn.setup(runtimeContext) + } + + override def flatMap(t: T): TraversableOnce[R] = { + fn.flatMap(t).asScala + } + + + override def teardown(runtimeContext: RuntimeContext): Unit = { + fn.teardown(runtimeContext) + } + } + } + +// def apply[T, R](fn: T => TraversableOnce[R]): FlatMapWithStateFunction[T, R] = { +// new FlatMapWithStateFunction[T, R] { +// override def flatMap(t: T): TraversableOnce[R] = { +// fn(t) +// } +// } +// } + + def apply[T, R](fn: MapWithStateFunction[T, R]): FlatMapWithStateFunction[T, R] = { + new FlatMapWithStateFunction[T, R] { + + override def setup(runtimeContext: RuntimeContext): Unit = { + fn.setup(runtimeContext) + } + + override def flatMap(t: T): TraversableOnce[R] = { + Option(fn.map(t)) + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + fn.teardown(runtimeContext) + } + } + } + + def apply[T, R](fn: FilterFunction[T]): FlatMapWithStateFunction[T, T] = { + new FlatMapWithStateFunction[T, T] { + + override def setup(runtimeContext: RuntimeContext): Unit = { + // TODO + fn.setup() + } + + override def flatMap(t: T): TraversableOnce[T] = { + if (fn.filter(t)) { + Option(t) + } else { + None + } + } + + override def teardown(runtimeContext: RuntimeContext): Unit = { + // TODO + fn.teardown() + } + } + } +} + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Scala version of FlatMapFunction returns a TraversableOnce. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapWithStateFunction[T, R] extends FlatMapFunction[T, R] { + + final override def setup(): Unit = { + throw new UnsupportedOperationException("please call or override " + + "setup(runtimeContext: RuntimeContext)") + } + + final override def teardown(): Unit = { + throw new UnsupportedOperationException("please call or override " + + " teardown(runtimeContext: RuntimeContext)") + } + + def setup(runtimeContext: RuntimeContext): Unit = {} + + def teardown(runtimeContext: RuntimeContext): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala new file mode 100644 index 0000000..16f8cc9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.task + +import java.time.Instant +import java.util.function.Consumer + +import com.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StatefulTask} +import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} + +/** + * Processes messages in groups as defined by groupBy function. + */ +class GroupByTask[IN, GROUP, OUT]( + groupBy: IN => GROUP, + taskContext: TaskContext, + userConfig: UserConfig) extends StatefulTask(taskContext, userConfig) { + + private var runtimeContext: RuntimeContext = null + + def this(context: TaskContext, conf: UserConfig) = { + this( + conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get, + context, conf + ) + } + + private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] = + new UnifiedMap[GROUP, WindowRunner[IN, OUT]] + + + override def open(runtimeContext: RuntimeContext): Unit = { + this.runtimeContext = runtimeContext + } + + override def invoke(message: Message): Unit = { + val input = message.value.asInstanceOf[IN] + val group = groupBy(input) + + if (!groups.containsKey(group)) { + groups.put(group, + userConfig.getValue[WindowRunner[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) + } + + groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN], + message.timestamp)) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + if (groups.isEmpty && watermark == Watermark.MAX) { + taskContext.updateWatermark(Watermark.MAX) + } else { + groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { + override def accept(runner: WindowRunner[IN, OUT]): Unit = { + TaskUtil.trigger(watermark, runner, taskContext, runtimeContext) + } + }) + } + + super.onWatermarkProgress(watermark) + } + + override def close(runtimeContext: RuntimeContext): Unit = { + + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala new file mode 100644 index 0000000..fa474ec --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.refactor.state.RuntimeContext +import org.apache.gearpump.streaming.task._ + +object TaskUtil { + + /** + * Resolves a classname to a Task class. + * + * @param className the class name to resolve + * @return resolved class + */ + def loadClass(className: String): Class[_ <: Task] = { + val loader = Thread.currentThread().getContextClassLoader() + loader.loadClass(className).asSubclass(classOf[Task]) + } + + def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT], + context: TaskContext, runtimeContext: RuntimeContext): Unit = { + val triggeredOutputs = runner.trigger(watermark, runtimeContext) + context.updateWatermark(triggeredOutputs.watermark) + triggeredOutputs.outputs.foreach { case TimestampedValue(v, t) => + context.output(Message(v, t)) + } + } + + /** + * @return t1 if t1 is not larger than t2 and t2 otherwise + */ + def min(t1: Instant, t2: Instant): Instant = { + if (t1.isAfter(t2)) t2 + else t1 + } + + /** + * @return t1 if t1 is not smaller than t2 and t2 otherwise + */ + def max(t1: Instant, t2: Instant): Instant = { + if (t2.isBefore(t1)) t1 + else t2 + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala new file mode 100644 index 0000000..b7dc895 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StatefulTask} +import org.apache.gearpump.streaming.task.{TaskContext} + +class TransformTask[IN, OUT]( + runner: WindowRunner[IN, OUT], + taskContext: TaskContext, userConf: UserConfig) extends StatefulTask(taskContext, userConf) { + + private var runtimeContext: RuntimeContext = null + + def this(context: TaskContext, conf: UserConfig) = { + this( + conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + context, conf + ) + } + + override def open(stateContext: RuntimeContext): Unit = { + this.runtimeContext = stateContext + } + + override def invoke(msg: Message): Unit = { + runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + TaskUtil.trigger(watermark, runner, taskContext, this.runtimeContext) + // do checkpoint + super.onWatermarkProgress(watermark) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala new file mode 100644 index 0000000..17c93bd --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.window.impl + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.window.api.Trigger + +trait ReduceFnRunner { + + def process(message: Message): Unit + + def onTrigger(trigger: Trigger): Unit + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala new file mode 100644 index 0000000..43d28ef --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.refactor.dsl.window.impl + +import java.time.Instant + +import com.gs.collections.api.block.predicate.Predicate +import com.gs.collections.api.block.procedure.Procedure +import com.gs.collections.impl.list.mutable.FastList +import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context +import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} +import org.apache.gearpump.streaming.dsl.window.impl.Window +import org.apache.gearpump.streaming.refactor.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.refactor.state.RuntimeContext +import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.task.TaskUtil + +import scala.collection.mutable.ArrayBuffer + +case class TimestampedValue[T](value: T, timestamp: Instant) + +case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], + watermark: Instant) + +trait WindowRunner[IN, OUT] extends java.io.Serializable { + + def process(timestampedValue: TimestampedValue[IN]): Unit + + def trigger(time: Instant, runtimeContext: RuntimeContext): TriggeredOutputs[OUT] + +} + +case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], + right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { + + override def process(timestampedValue: TimestampedValue[IN]): Unit = { + left.process(timestampedValue) + } + + override def trigger(time: Instant, runtimeContext: RuntimeContext): TriggeredOutputs[OUT] = { + val lOutputs = left.trigger(time, runtimeContext) + lOutputs.outputs.foreach(right.process) + right.trigger(lOutputs.watermark, runtimeContext) + } +} + +class DefaultWindowRunner[IN, OUT]( + windows: Windows, + fnRunner: FunctionRunner[IN, OUT]) + extends WindowRunner[IN, OUT] { + + private val windowFn = windows.windowFn + private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] + private var setup = false + private var watermark = Watermark.MIN + + override def process(timestampedValue: TimestampedValue[IN]): Unit = { + val wins = windowFn(new Context[IN] { + override def element: IN = timestampedValue.value + + override def timestamp: Instant = timestampedValue.timestamp + }) + wins.foreach { win => + if (windowFn.isNonMerging) { + if (!windowInputs.containsKey(win)) { + val inputs = new FastList[TimestampedValue[IN]] + windowInputs.put(win, inputs) + } + windowInputs.get(win).add(timestampedValue) + } else { + merge(windowInputs, win, timestampedValue) + } + } + + def merge( + winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]], + win: Window, tv: TimestampedValue[IN]): Unit = { + val intersected = winIns.keySet.select(new Predicate[Window] { + override def accept(each: Window): Boolean = { + win.intersects(each) + } + }) + var mergedWin = win + val mergedInputs = FastList.newListWith(tv) + intersected.forEach(new Procedure[Window] { + override def value(each: Window): Unit = { + mergedWin = mergedWin.span(each) + mergedInputs.addAll(winIns.remove(each)) + } + }) + winIns.put(mergedWin, mergedInputs) + } + } + + override def trigger(time: Instant, runtimeContext: RuntimeContext): TriggeredOutputs[OUT] = { + @annotation.tailrec + def onTrigger( + outputs: ArrayBuffer[TimestampedValue[OUT]], + wmk: Instant): TriggeredOutputs[OUT] = { + if (windowInputs.notEmpty()) { + val firstWin = windowInputs.firstKey + if (!time.isBefore(firstWin.endTime)) { + val inputs = windowInputs.remove(firstWin) + if (!setup) { + fnRunner.setup(runtimeContext) + setup = true + } + inputs.forEach(new Procedure[TimestampedValue[IN]] { + override def value(tv: TimestampedValue[IN]): Unit = { + fnRunner.process(tv.value).foreach { + out: OUT => outputs += TimestampedValue(out, tv.timestamp) + } + } + }) + fnRunner.finish().foreach { + out: OUT => + outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) + } + val newWmk = TaskUtil.max(wmk, firstWin.endTime) + if (windows.accumulationMode == Discarding) { + fnRunner.teardown(runtimeContext) + // discarding, setup need to be called for each window + setup = false + } + onTrigger(outputs, newWmk) + } else { + // minimum of end of last triggered window and start of first un-triggered window + TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime)) + } + } else { + if (time == Watermark.MAX) { + TriggeredOutputs(outputs, Watermark.MAX) + } else { + TriggeredOutputs(outputs, wmk) + } + } + } + + val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], watermark) + watermark = TaskUtil.max(watermark, triggeredOutputs.watermark) + TriggeredOutputs(triggeredOutputs.outputs, watermark) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala new file mode 100644 index 0000000..d0b84cb --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.sink + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.sink.DataSink + +object DataSinkProcessor { + def apply( + dataSink: DataSink, + parallelism: Int = 1, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSinkTask] = { + Processor[DataSinkTask](parallelism, description = description, + taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala new file mode 100644 index 0000000..24be828 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.sink + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StatefulTask} +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.{TaskContext} + +object DataSinkTask { + val DATA_SINK = "data_sink" +} + +class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: DataSink) + extends StatefulTask(context, conf) { + + def this(context: TaskContext, conf: UserConfig) = { + this(context, conf, conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get) + } + + override def open(runtimeContext: RuntimeContext): Unit = { + LOG.info("opening data sink...") + sink.open(context) + } + + override def invoke(message: Message): Unit = { + sink.write(message) + } + + override def close(runtimeContext: RuntimeContext): Unit = { + LOG.info("closing data sink...") + sink.close() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala new file mode 100644 index 0000000..de584d5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.source + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows} +import org.apache.gearpump.streaming.dsl.window.impl.Window +import org.apache.gearpump.streaming.refactor.dsl.plan.functions.DummyRunner +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.{Constants, Processor} + +object DataSourceProcessor { + def apply( + dataSource: DataSource, + parallelism: Int = 1, + description: String = "", + taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) + : Processor[DataSourceTask[Any, Any]] = { + Processor[DataSourceTask[Any, Any]](parallelism, description, + taskConf + .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource) + .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, + new DefaultWindowRunner[Any, Any]( + Windows(PerElementWindowFunction, description = "perElementWindows"), + new DummyRunner[Any]))) + } + + + case object PerElementWindowFunction extends WindowFunction { + override def apply[T]( + context: WindowFunction.Context[T]): Array[Window] = { + Array(Window(context.timestamp, context.timestamp.plusMillis(1))) + } + + override def isNonMerging: Boolean = true + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala new file mode 100644 index 0000000..ac6468d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.source + +import java.time.Instant + +import org.apache.gearpump._ +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.refactor.dsl.task.TaskUtil +import org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, StatefulTask} +import org.apache.gearpump.streaming.source.{DataSource, DataSourceConfig, Watermark} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that + * reads from DataSource in batch + * See [[org.apache.gearpump.streaming.source.DataSourceProcessor]] for its usage + * + * DataSourceTask calls: + * - `DataSource.open()` in `onStart` and pass in + * [[org.apache.gearpump.streaming.task.TaskContext]] + * and application start time + * - `DataSource.read()` in each `onNext`, which reads a batch of messages + * - `DataSource.close()` in `onStop` + */ +class DataSourceTask[IN, OUT] private[source]( + source: DataSource, + windowRunner: WindowRunner[IN, OUT], + context: TaskContext, + conf: UserConfig) + extends StatefulTask(context, conf) { + + private var runtimeContext: RuntimeContext = null + + def this(context: TaskContext, conf: UserConfig) = { + this( + conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, + conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + context, conf + ) + } + + private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) + + override def open(runtimeContext: RuntimeContext): Unit = { + this.runtimeContext = runtimeContext + LOG.info(s"opening data source at ${runtimeContext.getStartTime.toEpochMilli}") + source.open(context, runtimeContext.getStartTime) + self ! Watermark(source.getWatermark) + super.open(runtimeContext) + } + + override def invoke(m: Message): Unit = { + 0.until(batchSize).foreach { _ => + Option(source.read()).foreach( + msg => windowRunner.process( + TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))) + } + + self ! Watermark(source.getWatermark) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + TaskUtil.trigger(watermark, windowRunner, context, this.runtimeContext) + } + + + override def close(stateContext: RuntimeContext): Unit = { + LOG.info("closing data source...") + source.close() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala new file mode 100644 index 0000000..c387960 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.time.Instant + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.StateInternals + +/** + * + */ +trait RuntimeContext { + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals + + def getStartTime: Instant + +}
