This is an automated email from the ASF dual-hosted git repository.
kamir pushed a commit to branch kamir-patch-2
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/kamir-patch-2 by this push:
new a7afeee4 renamed examples to applications
a7afeee4 is described below
commit a7afeee4299e0f80149c1b101cc929cd2aeb4c3c
Author: Mirko Kämpf <[email protected]>
AuthorDate: Tue Aug 27 14:58:57 2024 +0200
renamed examples to applications
---
README.md | 2 +-
bin/wayang-submit | 7 +-
guides/develop-with-Wayang.md | 8 +-
guides/ml-in-Wayang.md | 2 +-
guides/tutorial.md | 4 +-
guides/wayang-examples.md | 8 +-
pom.xml | 7 +-
wayang-api/wayang-api-scala-java/README.md | 6 +-
wayang-applications/bin/run_wordcount.sh | 31 +++++
wayang-applications/bin/run_wordcount_kafka.sh | 31 +++++
.../data/case-study/DATA_REPO_001/README.md | 3 +
.../case-study/My Journey to Apache Wayang.docx | Bin 0 -> 14729 bytes
wayang-applications/data/case-study/README.md | 3 +
wayang-applications/pom.xml | 145 +++++++++++++++++++++
.../java/org/apache/wayang/applications/App.java | 28 ++++
.../wayang/applications/OutputSerializer.java | 37 ++++++
.../java/org/apache/wayang/applications/Util.java | 27 ++++
.../org/apache/wayang/applications/WordCount.java | 87 +++++++++++++
wayang-assembly/dependency-reduced-pom.xml | 2 +-
wayang-docs/src/main/resources/index.md | 6 +-
.../wayang/java/operators/JavaKafkaTopicSink.java | 23 ++--
...leSinkTest.java => JavaKafkaTopicSinkTest.java} | 106 ++++++++++-----
.../java/operators/JavaKafkaTopicSourceTest.java | 7 +-
.../java/operators/JavaTextFileSinkTest.java | 9 +-
24 files changed, 522 insertions(+), 67 deletions(-)
diff --git a/README.md b/README.md
index b4a0130d..46d893c1 100644
--- a/README.md
+++ b/README.md
@@ -48,7 +48,7 @@ Apache Wayang (incubating) can be used via the following APIs:
## Quick Guide for Running Wayang
-For a quick guide on how to run WordCount see [here](guides/tutorial.md).
+For a quick guide on how to run org.apache.wayang.examples.WordCount see
[here](guides/tutorial.md).
## Quick Guide for Developing with Wayang
diff --git a/bin/wayang-submit b/bin/wayang-submit
index 28ee041f..d9658c33 100755
--- a/bin/wayang-submit
+++ b/bin/wayang-submit
@@ -18,7 +18,6 @@
CLASS=$1
-
if [ -z "${CLASS}" ]; then
echo "Target Class for execution was not provided"
exit 1
@@ -120,5 +119,11 @@ do
ARGS="$ARGS \"${arg}\""
done
+WAYANG_CLASSPATH="${WAYANG_CLASSPATH}:${WAYANG_APP_HOME}"
+
+echo $WAYANG_CLASSPATH
+echo
+echo "[EXECUTE] :: $RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${ARGS}"
+echo
eval "$RUNNER $FLAGS -cp "${WAYANG_CLASSPATH}" $CLASS ${ARGS}"
diff --git a/guides/develop-with-Wayang.md b/guides/develop-with-Wayang.md
index 430cf2db..ddd3a84b 100644
--- a/guides/develop-with-Wayang.md
+++ b/guides/develop-with-Wayang.md
@@ -71,7 +71,7 @@ This tutorial shows users how to import Wayang in their Java
project using the m
```
A sample pom file can be found [here](pom-example.xml).
-# Test WordCount
+# Test org.apache.wayang.examples.WordCount
## Create a Java class that contains the main method that runs the Wordcount
Here is a sample implementation getting as input the filename (e.g.,
file:/Projects/Wayang/test.txt)
@@ -85,8 +85,8 @@ public static void main(String[] args) {
/* Get a plan builder */
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
- .withJobName("WordCount")
- .withUdfJarOf(WordCount.class);
+ .withJobName("org.apache.wayang.examples.WordCount")
+ .withUdfJarOf(org.apache.wayang.examples.WordCount.class);
/* Start building the Apache WayangPlan */
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
@@ -119,4 +119,4 @@ public static void main(String[] args) {
System.out.println(wordcounts);
}
```
-A sample Java class file can be found [here](WordCount.java).
+A sample Java class file can be found
[here](org.apache.wayang.examples.WordCount.java).
diff --git a/guides/ml-in-Wayang.md b/guides/ml-in-Wayang.md
index dc693337..bd0392b5 100644
--- a/guides/ml-in-Wayang.md
+++ b/guides/ml-in-Wayang.md
@@ -28,7 +28,7 @@ public class CustomEstimatableCost implements EstimatableCost
{
* by implementing the interface in this class.
*/
}
-public class WordCount {
+public class org.apache.wayang.examples.WordCount {
public static void main(String[] args) {
/* Create a Wayang context and specify the platforms Wayang will
consider */
Configuration config = new Configuration();
diff --git a/guides/tutorial.md b/guides/tutorial.md
index f3fdca60..1f196cbd 100644
--- a/guides/tutorial.md
+++ b/guides/tutorial.md
@@ -16,7 +16,7 @@
limitations under the License.
-->
-This tutorial will show users how to run the WordCount example locally with
Wayang.
+This tutorial will show users how to run the
org.apache.wayang.examples.WordCount example locally with Wayang.
# Clone repository
```shell
@@ -57,7 +57,7 @@ source ~/.zshrc
# Run the program
-To execute the WordCount example with Apache Wayang, you need to execute your
program with the 'wayang-submit' command:
+To execute the org.apache.wayang.examples.WordCount example with Apache
Wayang, you need to execute your program with the 'wayang-submit' command:
```shell
cd wayang-0.7.1-SNAPSHOT
diff --git a/guides/wayang-examples.md b/guides/wayang-examples.md
index 2717dde0..cd464139 100644
--- a/guides/wayang-examples.md
+++ b/guides/wayang-examples.md
@@ -17,13 +17,13 @@
-->
This page contains examples to be executed using Wayang.
-- [WordCount](#wordcount)
+- [org.apache.wayang.examples.WordCount](#wordcount)
* [Java scala-like API](#java-scala-like-api)
* [Scala API](#scala-api)
- [k-means](#k-means)
* [Scala API](#scala-api-1)
-## WordCount
+## org.apache.wayang.examples.WordCount
The "Hello World!" of data processing systems is the wordcount.
@@ -51,7 +51,7 @@ public class WordcountJava {
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
- .withJobName(String.format("WordCount (%s)", inputUrl))
+
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)",
inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
@@ -107,7 +107,7 @@ object WordcountScala {
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
- .withJobName(s"WordCount ($inputUrl)")
+ .withJobName(s"org.apache.wayang.examples.WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
diff --git a/pom.xml b/pom.xml
index 0aa63c58..1a35f001 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1338,14 +1338,15 @@
<modules>
<module>wayang-commons</module>
<module>wayang-platforms</module>
- <module>wayang-tests-integration</module>
<module>wayang-api</module>
<module>wayang-profiler</module>
<module>wayang-plugins</module>
<module>wayang-resources</module>
- <module>wayang-benchmark</module>
<module>wayang-assembly</module>
<module>wayang-ml4all</module>
- <!-- <module>wayang-docs</module> -->
+ <module>wayang-applications</module>
+ <module>wayang-benchmark</module>
+ <module>wayang-tests-integration</module>
+ <!--module>wayang-docs</module-->
</modules>
</project>
diff --git a/wayang-api/wayang-api-scala-java/README.md
b/wayang-api/wayang-api-scala-java/README.md
index 085741b6..21ae71d0 100644
--- a/wayang-api/wayang-api-scala-java/README.md
+++ b/wayang-api/wayang-api-scala-java/README.md
@@ -87,12 +87,12 @@ import org.apache.wayang.core.api.Configuration
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
-class WordCount {}
+class org.apache.wayang.examples.WordCount {}
-object WordCount {
+object org.apache.wayang.examples.WordCount {
def main(args: Array[String]): Unit = {
- println("WordCount")
+ println("org.apache.wayang.examples.WordCount")
println("Scala version:")
println(scala.util.Properties.versionString)
diff --git a/wayang-applications/bin/run_wordcount.sh
b/wayang-applications/bin/run_wordcount.sh
new file mode 100755
index 00000000..87b026a9
--- /dev/null
+++ b/wayang-applications/bin/run_wordcount.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
+
+cd ..
+cd ..
+
+#mvn clean compile package install -pl :wayang-assembly -Pdistribution
-DskipTests
+
+cd wayang-applications
+mvn compile package install -DskipTests
+
+cd ..
+
+source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
diff --git a/wayang-applications/bin/run_wordcount_kafka.sh
b/wayang-applications/bin/run_wordcount_kafka.sh
new file mode 100755
index 00000000..87b026a9
--- /dev/null
+++ b/wayang-applications/bin/run_wordcount_kafka.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export JAVA_HOME=/Users/kamir/.sdkman/candidates/java/current
+
+cd ..
+cd ..
+
+#mvn clean compile package install -pl :wayang-assembly -Pdistribution
-DskipTests
+
+cd wayang-applications
+mvn compile package install -DskipTests
+
+cd ..
+
+source ./.env.sh; bin/wayang-submit org.apache.wayang.applications.WordCount
java file://$(pwd)/wayang-applications/data/case-study/DATA_REPO_001/README.md
diff --git a/wayang-applications/data/case-study/DATA_REPO_001/README.md
b/wayang-applications/data/case-study/DATA_REPO_001/README.md
new file mode 100644
index 00000000..79b51add
--- /dev/null
+++ b/wayang-applications/data/case-study/DATA_REPO_001/README.md
@@ -0,0 +1,3 @@
+# Blossom Sky Setup on the "central node"
+
+docker pull ghcr.io/databloom-ai/bde:main
diff --git a/wayang-applications/data/case-study/My Journey to Apache
Wayang.docx b/wayang-applications/data/case-study/My Journey to Apache
Wayang.docx
new file mode 100644
index 00000000..b6e8d779
Binary files /dev/null and b/wayang-applications/data/case-study/My Journey to
Apache Wayang.docx differ
diff --git a/wayang-applications/data/case-study/README.md
b/wayang-applications/data/case-study/README.md
new file mode 100644
index 00000000..79b51add
--- /dev/null
+++ b/wayang-applications/data/case-study/README.md
@@ -0,0 +1,3 @@
+# Blossom Sky Setup on the "central node"
+
+docker pull ghcr.io/databloom-ai/bde:main
diff --git a/wayang-applications/pom.xml b/wayang-applications/pom.xml
new file mode 100644
index 00000000..756b6b20
--- /dev/null
+++ b/wayang-applications/pom.xml
@@ -0,0 +1,145 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang</artifactId>
+ <version>0.7.1</version>
+ </parent>
+
+ <artifactId>wayang-applications</artifactId>
+ <packaging>jar</packaging>
+
+ <name>wayang-applications</name>
+ <url>http://maven.apache.org</url>
+
+
+ <properties>
+
<DATA_REPO_001>file://${project.basedir}/wayang-applications/data/case-study/DATA_REPO_001</DATA_REPO_001>
+
<DATA_REPO_002>https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile</DATA_REPO_002>
+
<DIST_WAYANG_HOME>${project.basedir}/../wayang-assembly/target/apache-wayang-assembly-0.7.1-incubating-dist/wayang-0.7.1</DIST_WAYANG_HOME>
+ <WAYANG_VERSION>0.7.1</WAYANG_VERSION>
+
+ <spark.version>3.5.0</spark.version>
+ <maven.compiler.source>11</maven.compiler.source>
+ <maven.compiler.target>11</maven.compiler.target>
+
+ <!-- We also specify the file encoding of our source files, to avoid a
warning -->
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <assertj.version>3.17.2</assertj.version>
+ <commons-io.version>2.5</commons-io.version>
+ <guava.version>19.0</guava.version>
+ <hamcrest.version>1.3</hamcrest.version>
+ <jackson.version>2.10.2</jackson.version>
+ <jacoco.version>0.8.5</jacoco.version>
+ <jodatime.version>2.10.6</jodatime.version>
+ <jsonpath.version>2.4.0</jsonpath.version>
+ <junit5.version>5.6.1</junit5.version>
+ <mockito.version>3.5.10</mockito.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-core</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-basic</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-java</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark_2.12</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-api-scala-java_2.12</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ <!--scope>system</scope-->
+
<!--systemPath>/Users/mkaempf/GITHUB.private/kamir-incubator-wayang/wayang-assembly/target/apache-wayang-assembly-0.7.1-SNAPSHOT-incubating-dist/wayang-0.7.1-SNAPSHOT/jars/wayang-api-scala-java_2.12-0.7.1-SNAPSHOT.jar</systemPath-->
+ </dependency>
+ <!--dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-api-scala-java_2.12</artifactId>
+ <version>${WAYANG_VERSION}</version>
+ </dependency-->
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.12</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.7.13</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>3.4.0</version> <!-- Use the latest version available -->
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.15.4</version>
+ </dependency>
+
+ <!-- Test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.3.1</version>
+ <!--configuration>
+
<outputDirectory>/opt/homebrew/opt/apache-spark/jars</outputDirectory>
+ </configuration-->
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/wayang-applications/src/main/java/org/apache/wayang/applications/App.java
b/wayang-applications/src/main/java/org/apache/wayang/applications/App.java
new file mode 100644
index 00000000..4463b67e
--- /dev/null
+++ b/wayang-applications/src/main/java/org/apache/wayang/applications/App.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+public class App
+{
+ public static void main( String[] args )
+ {
+ System.out.println( "Hello Apache Wayang friends!" );
+ }
+
+}
diff --git
a/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
b/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
new file mode 100644
index 00000000..679a01c2
--- /dev/null
+++
b/wayang-applications/src/main/java/org/apache/wayang/applications/OutputSerializer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.applications;
+
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction;
+import org.apache.wayang.basic.data.Tuple2;
+
+public class OutputSerializer implements SerializableFunction<Tuple2<String,
Integer>, String> {
+
+ @Override
+ public String apply(Tuple2<String, Integer> tuple) {
+ return tuple.getField0() + ": " + tuple.getField1();
+ }
+
+ // Example usage within a main method or similar test environment
+ public static void main(String[] args) {
+ OutputSerializer formatter = new OutputSerializer();
+ Tuple2<String, Integer> exampleTuple = new Tuple2<>("Age", 30);
+ String result = formatter.apply(exampleTuple);
+ System.out.println(result); // Output: Age: 30
+ }
+}
diff --git
a/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java
b/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java
new file mode 100644
index 00000000..747feb78
--- /dev/null
+++ b/wayang-applications/src/main/java/org/apache/wayang/applications/Util.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import java.io.Serializable;
+
+public class Util implements Serializable {
+ public static String formatData( String f1, Integer f2 ) {
+ return String.format("%d, %s", f1, f2);
+ }
+}
diff --git
a/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
new file mode 100644
index 00000000..abd3d332
--- /dev/null
+++
b/wayang-applications/src/main/java/org/apache/wayang/applications/WordCount.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.applications;
+
+import org.apache.wayang.api.JavaPlanBuilder;
+import org.apache.wayang.basic.data.Tuple2;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.WayangContext;
+import
org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
+import org.apache.wayang.java.Java;
+import org.apache.wayang.spark.Spark;
+import java.util.Collection;
+import java.util.Arrays;
+
+public class WordCount {
+
+ public static void main(String[] args){
+
+ System.out.println( ">>> Apache Wayang Test #01");
+ System.out.println( " We use a local file and a 'Java Context'.");
+ int i = 0;
+ for (String arg : args) {
+ String line = String.format( " %d - %s", i,arg);
+ System.out.println(line);
+ i=i+1;
+ }
+
+ // Settings
+ String inputUrl = args[1];
+
+ // Get a plan builder.
+ WayangContext wayangContext = new WayangContext(new Configuration())
+ .withPlugin(Java.basicPlugin());
+ // .withPlugin(Spark.basicPlugin());
+ JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
+
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)",
inputUrl))
+ .withUdfJarOf(WordCount.class);
+
+ // Start building the WayangPlan.
+ Collection<Tuple2<String, Integer>> wordcounts = planBuilder
+ // Read the text file.
+ .readTextFile(inputUrl).withName("Load file")
+
+ // Split each line by non-word characters.
+ .flatMap(line -> Arrays.asList(line.split("\\W+")))
+ .withSelectivity(10, 100, 0.9)
+ .withName("Split words")
+
+ // Filter empty tokens.
+ .filter(token -> !token.isEmpty())
+ .withSelectivity(0.99, 0.99, 0.99)
+ .withName("Filter empty words")
+
+ // Attach counter to each word.
+ .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
lower case, add counter")
+
+ // Sum up counters for every word.
+ .reduceByKey(
+ Tuple2::getField0,
+ (t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
+ )
+ .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
1, false, in -> Math.round(0.01 * in[0])))
+ .withName("Add counters")
+
+ // Execute the plan and collect the results.
+ .collect();
+
+ System.out.println(wordcounts);
+ System.out.println( "*** Done. ***" );
+ }
+}
\ No newline at end of file
diff --git a/wayang-assembly/dependency-reduced-pom.xml
b/wayang-assembly/dependency-reduced-pom.xml
index 7713b09f..70d4e89b 100644
--- a/wayang-assembly/dependency-reduced-pom.xml
+++ b/wayang-assembly/dependency-reduced-pom.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -17,6 +16,7 @@
specific language governing permissions and limitations
under the License.
-->
+<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<artifactId>wayang</artifactId>
diff --git a/wayang-docs/src/main/resources/index.md
b/wayang-docs/src/main/resources/index.md
index 14c6834f..3252a7d6 100644
--- a/wayang-docs/src/main/resources/index.md
+++ b/wayang-docs/src/main/resources/index.md
@@ -172,7 +172,7 @@ This tool will attempt to determine suitable values for the
question marks (`?`)
For some executable examples, have a look at [this
repository](https://github.com/sekruse/rheem-examples).
-### WordCount
+### org.apache.wayang.examples.WordCount
#### Java API
```java
@@ -198,7 +198,7 @@ public class WordcountJava {
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
- .withJobName(String.format("WordCount (%s)", inputUrl))
+
.withJobName(String.format("org.apache.wayang.examples.WordCount (%s)",
inputUrl))
.withUdfJarOf(WordcountJava.class);
// Start building the WayangPlan.
@@ -254,7 +254,7 @@ object WordcountScala {
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
- .withJobName(s"WordCount ($inputUrl)")
+ .withJobName(s"org.apache.wayang.examples.WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
diff --git
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
index 23535307..8ca09676 100644
---
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
+++
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaKafkaTopicSink.java
@@ -48,19 +48,24 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Implementation fo the {@link KafkaTopicSink} for the {@link JavaPlatform}.
*/
public class JavaKafkaTopicSink<T> extends KafkaTopicSink<T> implements
JavaExecutionOperator {
+ private static final Logger logger =
LoggerFactory.getLogger(JavaKafkaTopicSink.class);
+
public JavaKafkaTopicSink(String topicName, TransformationDescriptor<T,
String> formattingDescriptor) {
super(topicName, formattingDescriptor);
- System.out.println("---> CREATE JavaKafkaTopicSink ... (2)");
+ logger.info("---> CREATE JavaKafkaTopicSink ... (Option 2)");
}
public JavaKafkaTopicSink(KafkaTopicSink<T> that) {
super(that);
- System.out.println("---> CREATE JavaKafkaTopicSink ... (1)");
+ logger.info("---> CREATE JavaKafkaTopicSink ... (Option 1)");
}
@Override
@@ -72,19 +77,17 @@ public class JavaKafkaTopicSink<T> extends
KafkaTopicSink<T> implements JavaExec
assert inputs.length == 1;
assert outputs.length == 0;
- System.out.println("---> WRITE TO KAFKA SINK...");
+ logger.info("---> WRITE TO KAFKA SINK...");
- System.out.println("### 9 ... ");
+ logger.info("### 9 ... ");
JavaChannelInstance input = (JavaChannelInstance) inputs[0];
initProducer( (KafkaTopicSink<T>) this );
- // File f = new File( "./" + this.topicName + ".txt" );
-
final Function<T, String> formatter =
javaExecutor.getCompiler().compile(this.formattingDescriptor);
- System.out.println("### 10 ... ");
+ logger.info("### 10 ... ");
try ( KafkaProducer<String,String> producer = getProducer() ) {
input.<T>provideStream().forEach(
@@ -103,10 +106,10 @@ public class JavaKafkaTopicSink<T> extends
KafkaTopicSink<T> implements JavaExec
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Handle any exceptions thrown during send
- System.err.println("Failed to send
message: " + exception.getMessage());
+ logger.error("Failed to send message: " +
exception.getMessage());
} else {
// Optionally handle successful send, log
metadata, etc.
- System.out.println("Message sent
successfully to " + metadata.topic() + " partition " + metadata.partition());
+ logger.info("Message sent successfully to
" + metadata.topic() + " partition " + metadata.partition());
}
});
} catch (Exception ex) {
@@ -118,7 +121,7 @@ public class JavaKafkaTopicSink<T> extends
KafkaTopicSink<T> implements JavaExec
throw new WayangException("Writing to Kafka topic failed.", e);
}
- System.out.println("### 11 ... ");
+ logger.info("### 11 ... ");
return ExecutionOperator.modelEagerExecution(inputs, outputs,
operatorContext);
}
diff --git
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
similarity index 51%
copy from
wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
copy to
wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
index 502d9ef6..331c63d0 100644
---
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
+++
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSinkTest.java
@@ -33,6 +33,9 @@ import org.apache.wayang.java.channels.StreamChannel;
import org.apache.wayang.java.execution.JavaExecutor;
import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.wayang.basic.operators.KafkaTopicSource;
+
+
import java.io.File;
import java.io.IOException;
import java.net.URI;
@@ -46,14 +49,20 @@ import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.Properties;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * Test suite for {@link JavaTextFileSink}.
+ * Test suite for {@link JavaKafkaTopicSink}.
*/
-public class JavaTextFileSinkTest extends JavaExecutionOperatorTestBase {
+public class JavaKafkaTopicSinkTest extends JavaExecutionOperatorTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JavaTextFileSinkTest.class);
private Locale defaultLocale;
@@ -73,37 +82,76 @@ public class JavaTextFileSinkTest extends
JavaExecutionOperatorTestBase {
Locale.setDefault(defaultLocale);
}
+
+
@Test
- public void testWritingLocalFile() throws IOException, URISyntaxException {
+ public void testWritingToKafkaTopic() throws Exception {
+
Configuration configuration = new Configuration();
- final File tempDir = LocalFileSystem.findTempDir();
- final String targetUrl = LocalFileSystem.toURL(new File(tempDir,
"testWritingLocalFile.txt"));
- JavaTextFileSink<Float> sink = new JavaTextFileSink<>(
- targetUrl,
- new TransformationDescriptor<>(
- f -> String.format("%.2f", f),
- Float.class, String.class
- )
- );
-
- Job job = mock(Job.class);
- when(job.getConfiguration()).thenReturn(configuration);
- final JavaExecutor javaExecutor = (JavaExecutor)
JavaPlatform.getInstance().createExecutor(job);
-
- StreamChannel.Instance inputChannelInstance = (StreamChannel.Instance)
StreamChannel.DESCRIPTOR
- .createChannel(mock(OutputSlot.class), configuration)
- .createInstance(javaExecutor,
mock(OptimizationContext.OperatorContext.class), 0);
- inputChannelInstance.accept(Stream.of(1.123f, -0.1f, 3f));
- evaluate(sink, new ChannelInstance[]{inputChannelInstance}, new
ChannelInstance[0]);
-
-
- final List<String> lines = Files.lines(Paths.get(new
URI(targetUrl))).collect(Collectors.toList());
- Assert.assertEquals(
- Arrays.asList("1.12", "-0.10", "3.00"),
- lines
- );
+ // We assume, that we write back into the same cluster, to avoid
"external copies"...
+ Properties props = KafkaTopicSource.getDefaultProperties();
+
+ logger.info(">>> Test: testWriteIntoKafkaTopic()");
+
+ final String topicName1 = "banking-tx-small-csv";
+
+ logger.info("> 0 ... ");
+
+ logger.info( "*** [TOPIC-Name] " + topicName1 + " ***");
+
+ logger.info( "> Write to topic ... ");
+
+ logger.info("> 1 ... ");
+
+ props.list(System.out);
+
+ logger.info("> 2 ... ");
+
+ JavaExecutor javaExecutor = null;
+
+ try {
+
+ JavaKafkaTopicSink<Float> sink = new JavaKafkaTopicSink<>(
+ topicName1,
+ new TransformationDescriptor<>(
+ f -> String.format("%.2f", f),
+ Float.class, String.class
+ )
+ );
+
+ logger.info("> 3 ... ");
+
+ Job job = mock(Job.class);
+ when(job.getConfiguration()).thenReturn(configuration);
+ javaExecutor = (JavaExecutor)
JavaPlatform.getInstance().createExecutor(job);
+
+ StreamChannel.Instance inputChannelInstance =
(StreamChannel.Instance) StreamChannel.DESCRIPTOR
+ .createChannel(mock(OutputSlot.class), configuration)
+ .createInstance(javaExecutor,
mock(OptimizationContext.OperatorContext.class), 0);
+ inputChannelInstance.accept(Stream.of(1.123f, -0.1f, 3f));
+ evaluate(sink, new ChannelInstance[]{inputChannelInstance}, new
ChannelInstance[0]);
+
+ logger.info("> 4 ... ");
+
+ }
+ catch (Exception ex ) {
+
+ ex.printStackTrace();
+
+ logger.info("##5## ... ");
+
+ Assert.fail();
+
+ }
+
+ Assert.assertTrue( true );
+
+ logger.info("> *6*");
+
}
+
+
}
diff --git
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
index 2264284b..c78436ca 100644
---
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
+++
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaKafkaTopicSourceTest.java
@@ -70,13 +70,13 @@ public class JavaKafkaTopicSourceTest extends
JavaExecutionOperatorTestBase {
Locale.setDefault(defaultLocale);
}
- // @Test
+ @Test
public void testA() throws Exception {
Assert.assertEquals(3, 3);
logger.info(">>> Test A");
}
- // @Test
+ @Test
public void testReadFromKafkaTopic() {
logger.info(">>> Test: testReadFromKafkaTopic()");
@@ -91,8 +91,7 @@ public class JavaKafkaTopicSourceTest extends
JavaExecutionOperatorTestBase {
logger.info("> 1 ... ");
- //Properties props = KafkaTopicSource.getDefaultProperties();
- Properties props = new Properties();
+ Properties props = KafkaTopicSource.getDefaultProperties();
logger.info("> 2 ... ");
diff --git
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
index 502d9ef6..d56d2416 100644
---
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
+++
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSinkTest.java
@@ -50,11 +50,16 @@ import java.util.stream.Stream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Test suite for {@link JavaTextFileSink}.
*/
public class JavaTextFileSinkTest extends JavaExecutionOperatorTestBase {
+ private static final Logger logger =
LoggerFactory.getLogger(JavaTextFileSinkTest.class);
+
private Locale defaultLocale;
/**
@@ -70,6 +75,7 @@ public class JavaTextFileSinkTest extends
JavaExecutionOperatorTestBase {
@After
public void teardownTest() {
+
Locale.setDefault(defaultLocale);
}
@@ -78,7 +84,7 @@ public class JavaTextFileSinkTest extends
JavaExecutionOperatorTestBase {
Configuration configuration = new Configuration();
final File tempDir = LocalFileSystem.findTempDir();
- final String targetUrl = LocalFileSystem.toURL(new File(tempDir,
"testWritingLocalFile.txt"));
+ final String targetUrl = LocalFileSystem.toURL(new File(tempDir,
"testWritingLocalFile_2.txt"));
JavaTextFileSink<Float> sink = new JavaTextFileSink<>(
targetUrl,
new TransformationDescriptor<>(
@@ -106,4 +112,5 @@ public class JavaTextFileSinkTest extends
JavaExecutionOperatorTestBase {
}
+
}