fix #2007, add Java DSL
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/95d3c613 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/95d3c613 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/95d3c613 Branch: refs/heads/master Commit: 95d3c6138eba36d2f1ae169a570cd1abed623a13 Parents: ef11f16 Author: manuzhang <[email protected]> Authored: Fri Mar 18 12:45:22 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:22:20 2016 +0800 ---------------------------------------------------------------------- .../examples/wordcountjava/dsl/WordCount.java | 86 ++++++++++++++++++++ .../examples/wordcount/dsl/WordCount.scala | 44 ++++++++++ .../javaapi/dsl/functions/FilterFunction.java | 28 +++++++ .../javaapi/dsl/functions/FlatMapFunction.java | 29 +++++++ .../javaapi/dsl/functions/GroupByFunction.java | 28 +++++++ .../javaapi/dsl/functions/MapFunction.java | 28 +++++++ .../javaapi/dsl/functions/ReduceFunction.java | 28 +++++++ .../streaming/dsl/example/WordCount.scala | 44 ---------- .../streaming/dsl/javaapi/JavaStream.scala | 64 +++++++++++++++ .../streaming/dsl/javaapi/JavaStreamApp.scala | 49 +++++++++++ 10 files changed, 384 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java new file mode 100644 index 0000000..6857017 --- /dev/null +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.examples.wordcountjava.dsl; + +import com.typesafe.config.Config; +import io.gearpump.cluster.ClusterConfig; +import io.gearpump.cluster.UserConfig; +import io.gearpump.cluster.client.ClientContext; +import io.gearpump.google.common.collect.Lists; +import io.gearpump.streaming.dsl.javaapi.JavaStream; +import io.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import io.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; +import io.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; +import io.gearpump.streaming.javaapi.dsl.functions.MapFunction; +import io.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; +import scala.Tuple2; + +import java.util.Iterator; +import java.util.List; + +public class WordCount { + + public static void main(String[] args) throws InterruptedException { + main(ClusterConfig.defaultConfig(), args); + } + + public static void main(Config akkaConf, String[] args) throws InterruptedException { + ClientContext context = new ClientContext(akkaConf); + JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); + List<String> source = Lists.newArrayList("This is a good start, bingo!! bingo!!"); + + JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source"); + + JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> apply(String s) { + return Lists.newArrayList(s.split("\\s+")).iterator(); + } + }, "flatMap"); + + JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(String s) { + return new Tuple2<String, Integer>(s, 1); + } + }, "map"); + + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String,Integer>, String>() { + @Override + public String apply(Tuple2<String, Integer> tuple) { + return tuple._1(); + } + }, 1, "groupBy"); + + JavaStream<Tuple2<String, Integer>> wordcount =groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { + @Override + public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); + } + }, "reduce"); + + wordcount.log(); + + app.run(); + context.close(); + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala new file mode 100644 index 0000000..cc516db --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.examples.wordcount.dsl + +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.streaming.dsl.StreamApp +import io.gearpump.streaming.dsl.StreamApp._ +import io.gearpump.util.AkkaApp + +object WordCount extends AkkaApp with ArgumentsParser{ + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + val data = "This is a good start, bingo!! bingo!!" + app.source(data.lines.toList, 1, "source"). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupByKey().sum.log + + val appId = context.submit(app) + context.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java new file mode 100644 index 0000000..e4e137f --- /dev/null +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * a function that decides whether to reserve a value<T> + */ +public interface FilterFunction<T> extends Serializable { + boolean apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java new file mode 100644 index 0000000..b65a338 --- /dev/null +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * a function that converts a value<T> to a iterator of value<R> + */ +public interface FlatMapFunction<T, R> extends Serializable { + Iterator<R> apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java new file mode 100644 index 0000000..651c477 --- /dev/null +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * a function that puts a value<T> into a Group + */ +public interface GroupByFunction<T, Group> extends Serializable { + Group apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java new file mode 100644 index 0000000..a30a671 --- /dev/null +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/MapFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * a function that converts a value<T> to value<R> + */ +public interface MapFunction<T, R> extends Serializable { + R apply(T t); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java new file mode 100644 index 0000000..0f4bb18 --- /dev/null +++ b/streaming/src/main/java/io/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.javaapi.dsl.functions; + +import java.io.Serializable; + +/** + * a function that applies reduce operation + */ +public interface ReduceFunction<T> extends Serializable { + T apply(T t1, T t2); +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala deleted file mode 100644 index 6e168a5..0000000 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/example/WordCount.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.streaming.dsl.example - -import io.gearpump.streaming.dsl.StreamApp -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import StreamApp._ -import com.typesafe.config.Config -import io.gearpump.util.AkkaApp - -object WordCount extends AkkaApp with ArgumentsParser{ - - override val options: Array[(String, CLIOption[Any])] = Array.empty - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val context = ClientContext(akkaConf) - val app = StreamApp("dsl", context) - val data = "This is a good start, bingo!! bingo!!" - app.source(data.lines.toList, 1, "source"). - // word => (word, count) - flatMap(line => line.split("[\\s]+")).map((_, 1)). - // (word, count1), (word, count2) => (word, count1 + count2) - groupByKey().sum.log - - val appId = context.submit(app) - context.close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala new file mode 100644 index 0000000..42d6ca7 --- /dev/null +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.dsl.javaapi + +import io.gearpump.cluster.UserConfig +import io.gearpump.streaming.dsl.Stream +import io.gearpump.streaming.javaapi.dsl.functions._ +import io.gearpump.streaming.task.Task + +import scala.collection.JavaConverters._ + +/** + * Java DSL + */ +class JavaStream[T](val stream: Stream[T]) { + + def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap({t: T => fn(t).asScala}, description)) + } + + def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.map({t: T => fn(t)}, description)) + } + + def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.filter({t: T => fn(t)}, description)) + } + + def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { + new JavaStream[T](stream.reduce({(t1: T, t2: T) => fn(t1, t2)}, description)) + } + + def log(): Unit = { + stream.log() + } + + def merge(other: JavaStream[T], description: String = null): JavaStream[T] = { + new JavaStream[T](stream.merge(other.stream, description)) + } + + def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) + } + + def process[R](processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String): JavaStream[R] = { + new JavaStream[R](stream.process(processor, parallelism, conf, description)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/95d3c613/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala new file mode 100644 index 0000000..0ad03cd --- /dev/null +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.dsl.javaapi + +import java.util.Collection + +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} +import io.gearpump.streaming.source.DataSource + +import scala.collection.JavaConverters._ + +class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { + + private val streamApp = StreamApp(name, context, userConfig) + + def source[T](collection: Collection[T], parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + val dataSource = new CollectionDataSource(collection.asScala.toSeq) + source(dataSource, parallelism, conf, description) + } + + def source[T](dataSource: DataSource, parallelism: Int, + conf: UserConfig, description: String): JavaStream[T] = { + new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) + } + + def run(): Unit = { + context.submit(streamApp) + } + +}
