http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml deleted file mode 100644 index 3661726..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml +++ /dev/null @@ -1,518 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-parent</artifactId> - <version>0.9-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-streaming-examples</artifactId> - <name>flink-streaming-examples</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <!-- get default data from flink-java-examples package --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.9</version><!--$NO-MVN-MAN-VER$--> - <executions> - <execution> - <id>unpack</id> - <phase>prepare-package</phase> - <goals> - <goal>unpack</goal> - </goals> - <configuration> - <artifactItems> - <artifactItem> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java-examples</artifactId> - <version>${project.version}</version> - <type>jar</type> - <overWrite>false</overWrite> - <outputDirectory>${project.build.directory}/classes</outputDirectory> - <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes> - </artifactItem> - </artifactItems> - </configuration> - </execution> - </executions> - </plugin> - - <!-- self-contained jars for each example --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - - <executions> - - <!-- Iteration --> - <execution> - <id>Iteration</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>Iteration</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/iteration/*.class</include> - </includes> - </configuration> - </execution> - - <!-- IncrementalLearning --> - <execution> - <id>IncrementalLearning</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>IncrementalLearning</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/ml/*.class</include> - </includes> - </configuration> - </execution> - - <!-- Twitter --> - <execution> - <id>Twitter</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>Twitter</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/twitter/*.class</include> - <include>org/apache/flink/streaming/examples/twitter/util/*.class</include> - </includes> - </configuration> - </execution> - - <!-- WindowJoin --> - <execution> - <id>WindowJoin</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WindowJoin</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/join/*.class</include> - </includes> - </configuration> - </execution> - - <!-- WordCountPOJO --> - <execution> - <id>WordCountPOJO</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WordCountPOJO</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include> - <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include> - <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - <!-- WordCount --> - <execution> - <id>WordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>WordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include> - <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include> - </includes> - </configuration> - </execution> - - <!-- SocketTextStreamWordCount --> - <execution> - <id>SocketTextStreamWordCount</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>SocketTextStreamWordCount</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include> - <include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include> - </includes> - </configuration> - </execution> - - <!-- DeltaExract --> - <execution> - <id>DeltaExract</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>DeltaExract</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include> - </includes> - </configuration> - </execution> - - <!-- MultiplePolicies --> - <execution> - <id>MultiplePolicies</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>MultiplePolicies</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include> - </includes> - </configuration> - </execution> - - <!-- SlidingExample --> - <execution> - <id>SlidingExample</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>SlidingExample</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include> - </includes> - </configuration> - </execution> - - <!-- TimeWindowing --> - <execution> - <id>TimeWindowing</id> - <phase>package</phase> - <goals> - <goal>jar</goal> - </goals> - <configuration> - <classifier>TimeWindowing</classifier> - - <archive> - <manifestEntries> - <program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class> - </manifestEntries> - </archive> - - <includes> - <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include> - <include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include> - </includes> - </configuration> - </execution> - </executions> - </plugin> - -<!-- Scala Compiler --> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.1.4</version> - <executions> - <!-- Run scala compiler in the process-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) compile phase --> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - - <!-- Run scala compiler in the process-test-resources phase, so that dependencies on - scala classes can be resolved later in the (Java) test-compile phase --> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <jvmArgs> - <jvmArg>-Xms128m</jvmArg> - <jvmArg>-Xmx512m</jvmArg> - </jvmArgs> - <compilerPlugins> - <compilerPlugin> - <groupId>org.scalamacros</groupId> - <artifactId>paradise_${scala.version}</artifactId> - <version>${scala.macros.version}</version> - </compilerPlugin> - </compilerPlugins> - </configuration> - </plugin> - - <!-- Eclipse Integration --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.8</version> - <configuration> - <downloadSources>true</downloadSources> - <projectnatures> - <projectnature>org.scala-ide.sdt.core.scalanature</projectnature> - <projectnature>org.eclipse.jdt.core.javanature</projectnature> - </projectnatures> - <buildcommands> - <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand> - </buildcommands> - <classpathContainers> - <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer> - <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> - </classpathContainers> - <excludes> - <exclude>org.scala-lang:scala-library</exclude> - <exclude>org.scala-lang:scala-compiler</exclude> - </excludes> - <sourceIncludes> - <sourceInclude>**/*.scala</sourceInclude> - <sourceInclude>**/*.java</sourceInclude> - </sourceIncludes> - </configuration> - </plugin> - - <!-- Adding scala source directories to build path --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.7</version> - <executions> - <!-- Add src/main/scala to eclipse build path --> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/main/scala</source> - </sources> - </configuration> - </execution> - <!-- Add src/test/scala to eclipse build path --> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/test/scala</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.scalastyle</groupId> - <artifactId>scalastyle-maven-plugin</artifactId> - <version>0.5.0</version> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <verbose>false</verbose> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - <failOnWarning>false</failOnWarning> - <sourceDirectory>${basedir}/src/main/scala</sourceDirectory> - <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory> - <configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation> - <outputFile>${project.basedir}/scalastyle-output.xml</outputFile> - <outputEncoding>UTF-8</outputEncoding> - </configuration> - </plugin> - - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <versionRange>[2.9,)</versionRange> - <goals> - <goal>unpack</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - - </build> - -</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java deleted file mode 100644 index 998e818..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.iteration; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.IterativeDataStream; -import org.apache.flink.streaming.api.datastream.SplitDataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** - * Example illustrating iterations in Flink streaming. - * - * <p> - * The program sums up random numbers and counts additions it performs to reach - * a specific threshold in an iterative streaming fashion. - * </p> - * - * <p> - * This example shows how to use: - * <ul> - * <li>streaming iterations, - * <li>buffer timeout to enhance latency, - * <li>directed outputs. - * </ul> - */ -public class IterateExample { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up input for the stream of (0,0) pairs - List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>(); - for (int i = 0; i < 1000; i++) { - input.add(new Tuple2<Double, Integer>(0., 0)); - } - - // obtain execution environment and set setBufferTimeout(0) to enable - // continuous flushing of the output buffers (lowest latency) - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() - .setBufferTimeout(1); - - // create an iterative data stream from the input with 5 second timeout - IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle() - .iterate(5000); - - // apply the step function to add new random value to the tuple and to - // increment the counter and split the output with the output selector - SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector()); - - // close the iteration by selecting the tuples that were directed to the - // 'iterate' channel in the output selector - it.closeWith(step.select("iterate")); - - // to produce the final output select the tuples directed to the - // 'output' channel then project it to the desired second field - - DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class); - - // emit result - if (fileOutput) { - numbers.writeAsText(outputPath, 1); - } else { - numbers.print(); - } - - // execute the program - env.execute("Streaming Iteration Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Iteration step function which takes an input (Double , Integer) and - * produces an output (Double + random, Integer + 1). - */ - public static class Step extends - RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> { - private static final long serialVersionUID = 1L; - private transient Random rnd; - - public void open(Configuration parameters) { - rnd = new Random(); - } - - @Override - public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception { - return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1); - } - } - - /** - * OutputSelector testing which tuple needs to be iterated again. - */ - public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> { - private static final long serialVersionUID = 1L; - - @Override - public Iterable<String> select(Tuple2<Double, Integer> value) { - List<String> output = new ArrayList<String>(); - if (value.f0 > 100) { - output.add("output"); - } else { - output.add("iterate"); - } - return output; - } - - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: IterateExample <result path>"); - return false; - } - } else { - System.out.println("Executing IterateExample with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: IterateExample <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java deleted file mode 100644 index dcfed50..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.join; - -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.RichSourceFunction; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.util.Collector; - -/** - * Example illustrating join over sliding windows of streams in Flink. - * - * <p> - * his example will join two streams with a sliding window. One which emits - * grades and one which emits salaries of people. - * </p> - * - * <p> - * This example shows how to: - * <ul> - * <li>do windowed joins, - * <li>use tuple data types, - * <li>write a simple streaming program. - */ -public class WindowJoin { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // obtain execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // connect to the data sources for grades and salaries - DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource()); - DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource()); - - // apply a temporal join over the two stream based on the names over one - // second windows - DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades - .join(salaries) - .onWindow(1, TimeUnit.SECONDS) - .where(0) - .equalTo(0) - .with(new MyJoinFunction()); - - // emit result - if (fileOutput) { - joinedStream.writeAsText(outputPath, 1); - } else { - joinedStream.print(); - } - - // execute program - env.execute("Windowed Join Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" }; - private final static int GRADE_COUNT = 5; - private final static int SALARY_MAX = 10000; - private final static int SLEEP_TIME = 10; - - /** - * Continuously emit tuples with random names and integers (grades). - */ - public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> { - private static final long serialVersionUID = 1L; - - private Random rand; - private Tuple2<String, Integer> outTuple; - - public GradeSource() { - rand = new Random(); - outTuple = new Tuple2<String, Integer>(); - } - - @Override - public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception { - while (true) { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1; - out.collect(outTuple); - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - } - } - } - - /** - * Continuously emit tuples with random names and integers (salaries). - */ - public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> { - private static final long serialVersionUID = 1L; - - private transient Random rand; - private transient Tuple2<String, Integer> outTuple; - - public void open(Configuration parameters) throws Exception { - super.open(parameters); - rand = new Random(); - outTuple = new Tuple2<String, Integer>(); - } - - @Override - public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception { - while (true) { - outTuple.f0 = names[rand.nextInt(names.length)]; - outTuple.f1 = rand.nextInt(SALARY_MAX) + 1; - out.collect(outTuple); - Thread.sleep(rand.nextInt(SLEEP_TIME) + 1); - } - } - } - - public static class MyJoinFunction - implements - JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> { - - private static final long serialVersionUID = 1L; - - private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>(); - - @Override - public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, - Tuple2<String, Integer> second) throws Exception { - joined.f0 = first.f0; - joined.f1 = first.f1; - joined.f2 = second.f1; - return joined; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: WindowJoin <result path>"); - return false; - } - } else { - System.out.println("Executing WindowJoin with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: WindowJoin <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java deleted file mode 100755 index 375c86d..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.ml; - -import java.util.concurrent.TimeUnit; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.co.CoMapFunction; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.helper.Time; -import org.apache.flink.util.Collector; - -/** - * Skeleton for incremental machine learning algorithm consisting of a - * pre-computed model, which gets updated for the new inputs and new input data - * for which the job provides predictions. - * - * <p> - * This may serve as a base of a number of algorithms, e.g. updating an - * incremental Alternating Least Squares model while also providing the - * predictions. - * </p> - * - * <p> - * This example shows how to use: - * <ul> - * <li>Connected streams - * <li>CoFunctions - * <li>Tuple data types - * </ul> - */ -public class IncrementalLearningSkeleton { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // build new model on every second of new data - DataStream<Double[]> model = env.addSource(new TrainingDataSource()) - .window(Time.of(5000, TimeUnit.MILLISECONDS)) - .reduceGroup(new PartialModelBuilder()); - - // use partial model for prediction - DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model) - .map(new Predictor()); - - // emit result - if (fileOutput) { - prediction.writeAsText(outputPath, 1); - } else { - prediction.print(); - } - - // execute program - env.execute("Streaming Incremental Learning"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Feeds new data for prediction. By default it is implemented as constantly - * emitting the Integer 1 in a loop. - */ - public static class NewDataSource implements SourceFunction<Integer> { - private static final long serialVersionUID = 1L; - private static final int NEW_DATA_SLEEP_TIME = 1000; - - @Override - public void invoke(Collector<Integer> collector) throws Exception { - while (true) { - collector.collect(getNewData()); - } - } - - private Integer getNewData() throws InterruptedException { - Thread.sleep(NEW_DATA_SLEEP_TIME); - return 1; - } - } - - /** - * Feeds new training data for the partial model builder. By default it is - * implemented as constantly emitting the Integer 1 in a loop. - */ - public static class TrainingDataSource implements SourceFunction<Integer> { - private static final long serialVersionUID = 1L; - private static final int TRAINING_DATA_SLEEP_TIME = 10; - - @Override - public void invoke(Collector<Integer> collector) throws Exception { - while (true) { - collector.collect(getTrainingData()); - } - - } - - private Integer getTrainingData() throws InterruptedException { - Thread.sleep(TRAINING_DATA_SLEEP_TIME); - return 1; - - } - } - - /** - * Builds up-to-date partial models on new training data. - */ - public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> { - private static final long serialVersionUID = 1L; - - protected Double[] buildPartialModel(Iterable<Integer> values) { - return new Double[] { 1. }; - } - - @Override - public void reduce(Iterable<Integer> values, Collector<Double[]> out) throws Exception { - out.collect(buildPartialModel(values)); - } - } - - /** - * Creates prediction using the model produced in batch-processing and the - * up-to-date partial model. - * - * <p> - * By defaults emits the Integer 0 for every prediction and the Integer 1 - * for every model update. - * </p> - */ - public static class Predictor implements CoMapFunction<Integer, Double[], Integer> { - private static final long serialVersionUID = 1L; - - Double[] batchModel = null; - Double[] partialModel = null; - - @Override - public Integer map1(Integer value) { - // Return prediction - return predict(value); - } - - @Override - public Integer map2(Double[] value) { - // Update model - partialModel = value; - batchModel = getBatchModel(); - return 1; - } - - // pulls model built with batch-job on the old training data - protected Double[] getBatchModel() { - return new Double[] { 0. }; - } - - // performs prediction using the two models - protected Integer predict(Integer inTuple) { - return 0; - } - - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 1) { - outputPath = args[0]; - } else { - System.err.println("Usage: IncrementalLearningSkeleton <result path>"); - return false; - } - } else { - System.out.println("Executing IncrementalLearningSkeleton with generated data."); - System.out.println(" Provide parameter to write to file."); - System.out.println(" Usage: IncrementalLearningSkeleton <result path>"); - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java deleted file mode 100644 index e9b60f4..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.socket; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer; - -/** - * This example shows an implementation of WordCount with data from a text - * socket. To run the example make sure that the service providing the text data - * is already up and running. - * - * <p> - * To start an example socket text stream on your local machine run netcat from - * a command line: <code>nc -lk 9999</code>, where the parameter specifies the - * port number. - * - * - * <p> - * Usage: - * <code>SocketTextStreamWordCount <hostname> <port> <result path></code> - * <br> - * - * <p> - * This example shows how to: - * <ul> - * <li>use StreamExecutionEnvironment.socketTextStream - * <li>write a simple Flink program, - * <li>write and use user-defined functions. - * </ul> - * - * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a> - */ -public class SocketTextStreamWordCount { - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - final StreamExecutionEnvironment env = StreamExecutionEnvironment - .getExecutionEnvironment(); - - // get input data - DataStream<String> text = env.socketTextStream(hostName, port); - - DataStream<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - if (fileOutput) { - counts.writeAsText(outputPath, 1); - } else { - counts.print(); - } - - // execute program - env.execute("WordCount from SocketTextStream Example"); - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String hostName; - private static int port; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - // parse input arguments - if (args.length == 3) { - fileOutput = true; - hostName = args[0]; - port = Integer.valueOf(args[1]); - outputPath = args[2]; - } else if (args.length == 2) { - hostName = args[0]; - port = Integer.valueOf(args[1]); - } else { - System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]"); - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java deleted file mode 100644 index 1901475..0000000 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.twitter; - -import java.util.StringTokenizer; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; -import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData; -import org.apache.flink.util.Collector; -import org.apache.sling.commons.json.JSONException; - -/** - * Implements the "TwitterStream" program that computes a most used word - * occurrence over JSON files in a streaming fashion. - * - * <p> - * The input is a JSON text file with lines separated by newline characters. - * - * <p> - * Usage: <code>TwitterStream <text path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link TwitterStreamData}. - * - * <p> - * This example shows how to: - * <ul> - * <li>acquire external data, - * <li>use in-line defined functions, - * <li>handle flattened stream inputs. - * </ul> - * - */ -public class TwitterStream { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - if (!parseParameters(args)) { - return; - } - - // set up the execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setBufferTimeout(1000); - - // get input data - DataStream<String> streamSource = getTextDataStream(env); - - DataStream<Tuple2<String, Integer>> tweets = streamSource - // selecting English tweets and splitting to words - .flatMap(new SelectEnglishAndTokenizeFlatMap()) - // returning (word, 1) - .map(new MapFunction<String, Tuple2<String, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Integer> map(String value) throws Exception { - return new Tuple2<String, Integer>(value, 1); - } - }) - // group by words and sum their occurence - .groupBy(0).sum(1) - // select word with maximum occurence - .flatMap(new SelectMaxOccurence()); - - // emit result - if (fileOutput) { - tweets.writeAsText(outputPath, 1); - } else { - tweets.print(); - } - - // execute program - env.execute("Twitter Streaming Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Makes sentences from English tweets. - * - * <p> - * Implements a string tokenizer that splits sentences into words as a - * user-defined FlatMapFunction. The function takes a line (String) and - * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, - * Integer>). - * </p> - */ - public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> { - private static final long serialVersionUID = 1L; - - /** - * Select the language from the incoming JSON text - */ - @Override - public void flatMap(String value, Collector<String> out) throws Exception { - try { - if (getString(value, "lang").equals("en")) { - // message of tweet - StringTokenizer tokenizer = new StringTokenizer(getString(value, "text")); - - // split the message - while (tokenizer.hasMoreTokens()) { - String result = tokenizer.nextToken().replaceAll("\\s*", ""); - - if (result != null && !result.equals("")) { - out.collect(result); - } - } - } - } catch (JSONException e) { - - } - } - } - - /** - * Implements a user-defined FlatMapFunction that checks if the current - * occurence is higher than the maximum occurence. If so, returns the word - * and changes the maximum. - */ - public static class SelectMaxOccurence implements - FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { - private static final long serialVersionUID = 1L; - private Integer maximum; - - public SelectMaxOccurence() { - this.maximum = 0; - } - - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) - throws Exception { - if ((Integer) value.getField(1) >= maximum) { - out.collect(value); - maximum = (Integer) value.getField(1); - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - if (args.length > 0) { - // parse input arguments - fileOutput = true; - if (args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>"); - return false; - } - } else { - System.out.println("Executing TwitterStream example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" USAGE: TwitterStream <pathToPropertiesFile>"); - } - return true; - } - - private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) { - if (fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return env.fromElements(TwitterStreamData.TEXTS); - } - } -}