flink example
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/commit/4491cfe1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/tree/4491cfe1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/diff/4491cfe1 Branch: refs/heads/master Commit: 4491cfe1d0bf7324073537e89e7e8b6ed8ab43d5 Parents: b3429dd Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Mon Sep 26 12:43:22 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Mon Sep 26 12:43:22 2016 -0500 ---------------------------------------------------------------------- flink/flink-twitter-collection/README.md | 8 + flink/flink-twitter-collection/pom.xml | 420 ++++++++ .../jsonschema/FlinkBatchConfiguration.json | 12 + .../jsonschema/FlinkStreamingConfiguration.json | 40 + .../jsonschema/StreamsFlinkConfiguration.json | 48 + .../TwitterFollowingPipelineConfiguration.json | 29 + .../TwitterPostsPipelineConfiguration.json | 29 + ...terUserInformationPipelineConfiguration.json | 29 + .../streams/examples/flink/FlinkBase.scala | 200 ++++ .../FlinkTwitterFollowingPipeline.scala | 149 +++ .../collection/FlinkTwitterPostsPipeline.scala | 165 +++ .../FlinkTwitterUserInformationPipeline.scala | 163 +++ .../markdown/FlinkTwitterFollowingPipeline.md | 41 + .../site/markdown/FlinkTwitterPostsPipeline.md | 41 + .../FlinkTwitterUserInformationPipeline.md | 41 + .../src/site/markdown/index.md | 32 + .../site/resources/FlinkBatchConfiguration.json | 12 + .../resources/FlinkStreamingConfiguration.json | 40 + .../resources/StreamsFlinkConfiguration.json | 48 + .../TwitterFollowingBatchConfiguration.json | 23 + .../TwitterFollowingPipelineConfiguration.json | 29 + .../TwitterPostsBatchConfiguration.json | 23 + .../TwitterPostsPipelineConfiguration.json | 29 + ...witterUserInformationBatchConfiguration.json | 23 + ...terUserInformationPipelineConfiguration.json | 29 + .../src/test/resources/1000twitterids.txt | 1000 ++++++++++++++++++ .../FlinkTwitterFollowingPipeline.conf | 10 + .../resources/FlinkTwitterPostsPipeline.conf | 10 + .../FlinkTwitterUserInformationPipeline.conf | 10 + .../src/test/resources/asf.txt | 1 + .../test/FlinkTwitterFollowingPipelineIT.scala | 81 ++ .../test/FlinkTwitterPostsPipelineIT.scala | 55 + .../FlinkTwitterUserInformationPipelineIT.scala | 56 + flink/pom.xml | 47 + pom.xml | 29 +- 35 files changed, 2988 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/README.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/README.md b/flink/flink-twitter-collection/README.md new file mode 100644 index 0000000..f9fe687 --- /dev/null +++ b/flink/flink-twitter-collection/README.md @@ -0,0 +1,8 @@ +Apache Streams (incubating) +Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 +-------------------------------------------------------------------------------- + +org.apache.streams:flink-twitter-collection +=========================================== + +[README.md](src/site/markdown/index.md "README") http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/pom.xml ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/pom.xml b/flink/flink-twitter-collection/pom.xml new file mode 100644 index 0000000..33b05fe --- /dev/null +++ b/flink/flink-twitter-collection/pom.xml @@ -0,0 +1,420 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.streams</groupId> + <artifactId>streams-examples-flink</artifactId> + <version>0.4-incubating-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-twitter-collection</artifactId> + <name>flink-twitter-collection</name> + + <description>Collects twitter documents using flink.</description> + + <properties> + <docker.repo>apachestreams</docker.repo> + <hdfs.version>2.7.0</hdfs.version> + <flink.version>1.1.2</flink.version> + <scala.suffix>2.10</scala.suffix> + </properties> + + <dependencies> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-core</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-config</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-util</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-pojo</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-provider-twitter</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.streams</groupId> + <artifactId>streams-persist-hdfs</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hdfs.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.suffix}</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.suffix}</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-filesystem_2.10</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.suffix}</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-metrics-core</artifactId> + <version>${flink.version}</version> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>${logback.version}</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + <version>${logback.version}</version> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/main/scala</sourceDirectory> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <artifactId>maven-clean-plugin</artifactId> + <configuration> + <filesets> + <fileset> + <directory>data</directory> + <followSymlinks>false</followSymlinks> + </fileset> + </filesets> + </configuration> + </plugin> + <!-- This binary runs with logback --> + <!-- Keep log4j out --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce-banned-dependencies</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <bannedDependencies> + <excludes> + <exclude>org.slf4j:slf4j-log4j12</exclude> + <exclude>org.slf4j:slf4j-jcl</exclude> + <exclude>org.slf4j:slf4j-jdk14</exclude> + <exclude>org.log4j:log4j</exclude> + <exclude>commons-logging:commons-logging</exclude> + </excludes> + </bannedDependencies> + </rules> + <fail>true</fail> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-maven-plugin</artifactId> + <version>0.4.1</version> + <configuration> + <addCompileSourceRoot>true</addCompileSourceRoot> + <generateBuilders>true</generateBuilders> + <sourcePaths> + <sourcePath>src/main/jsonschema</sourcePath> + </sourcePaths> + <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory> + <targetPackage>org.apache.streams.example.elasticsearch</targetPackage> + <useJodaDates>false</useJodaDates> + </configuration> + <executions> + <execution> + <goals> + <goal>generate</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/jsonschema2pojo</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <includes>**/*.json</includes> + <outputDirectory>${project.build.directory}/test-classes</outputDirectory> + <includeGroupIds>org.apache.streams</includeGroupIds> + <includeTypes>test-jar</includeTypes> + </configuration> + <executions> + <execution> + <id>test-resource-dependencies</id> + <phase>process-test-resources</phase> + <goals> + <goal>unpack-dependencies</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>2.12.4</version> + <executions> + <execution> + <id>integration-tests</id> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json new file mode 100644 index 0000000..30a2942 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkBatchConfiguration.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "StreamsFlinkConfiguration.json" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json new file mode 100644 index 0000000..0d63f4e --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/FlinkStreamingConfiguration.json @@ -0,0 +1,40 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "StreamsFlinkConfiguration.json" + }, + "properties": { + "parallel": { + "type": "integer", + "default": 1 + }, + "providerWaitMs": { + "type": "integer", + "default": 1000 + }, + "checkpointIntervalMs": { + "type": "integer", + "default": 300000 + }, + "checkpointTimeoutMs": { + "type": "integer", + "default": 30000 + }, + "restartAttempts": { + "type": "integer", + "description": "number of restart attempts", + "default": 3 + }, + "restartDelayMs": { + "type": "integer", + "description": "delay in milliseconds", + "default": 10000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json new file mode 100644 index 0000000..ef78357 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/StreamsFlinkConfiguration.json @@ -0,0 +1,48 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json" + }, + "properties": { + "parallel": { + "type": "integer", + "default": 1 + }, + "providerWaitMs": { + "type": "integer", + "default": 1000 + }, + "checkpointIntervalMs": { + "type": "integer", + "default": 300000 + }, + "checkpointTimeoutMs": { + "type": "integer", + "default": 30000 + }, + "test": { + "type": "boolean", + "default": false + }, + "local": { + "type": "boolean", + "default": true + }, + "restartAttempts": { + "type": "integer", + "description": "number of restart attempts", + "default": 3 + }, + "restartDelayMs": { + "type": "integer", + "description": "delay in milliseconds", + "default": 10000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json new file mode 100644 index 0000000..de4f9bb --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterFollowingPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json new file mode 100644 index 0000000..628d7ee --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterPostsPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json new file mode 100644 index 0000000..5261748 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/jsonschema/TwitterUserInformationPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala new file mode 100644 index 0000000..1f1ed6d --- /dev/null +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -0,0 +1,200 @@ +package org.apache.streams.examples.flink + +import java.net.MalformedURLException + +import com.google.common.base.Strings +import com.typesafe.config.Config +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.streaming.api.CheckpointingMode +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.flink.{FlinkBatchConfiguration, FlinkStreamingConfiguration, StreamsFlinkConfiguration} +import org.apache.streams.hdfs.{HdfsConfiguration, HdfsReaderConfiguration, HdfsWriterConfiguration} +import org.apache.streams.jackson.StreamsJacksonMapper +import org.slf4j.LoggerFactory + +trait FlinkBase { + + private val BASELOGGER = LoggerFactory.getLogger("FlinkBase") + private val MAPPER = StreamsJacksonMapper.getInstance() + + var configUrl : String = _ + var typesafe : Config = _ + var streamsConfig = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig) + var streamsFlinkConfiguration: StreamsFlinkConfiguration = _ + + var executionEnvironment: ExecutionEnvironment = _ + var streamExecutionEnvironment: StreamExecutionEnvironment = _ + + /* + Basic stuff for every flink job + */ + def main(args: Array[String]): Unit = { + // if only one argument, use it as the config URL + if( args.size > 0 ) { + BASELOGGER.info("Args: {}", args) + configUrl = args(0) + setup(configUrl) + } + + } + + def setup(configUrl : String): Boolean = { + BASELOGGER.info("StreamsConfigurator.config: {}", StreamsConfigurator.config) + if( !Strings.isNullOrEmpty(configUrl)) { + BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl)) + try { + typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.config).resolve() + } catch { + case mue: MalformedURLException => { + BASELOGGER.error("Invalid Configuration URL: ", mue) + return false + } + case e: Exception => { + BASELOGGER.error("Invalid Configuration URL: ", e) + return false + } + } + } + else { + typesafe = StreamsConfigurator.getConfig + } + + return setup(typesafe) + + } + + def setup(typesafe : Config): Boolean = { + this.typesafe = typesafe + + BASELOGGER.info("Typesafe Config: {}", typesafe) + + if( this.typesafe.getString("mode").equals("streaming")) { + val streamingConfiguration: FlinkStreamingConfiguration = + new ComponentConfigurator[FlinkStreamingConfiguration](classOf[FlinkStreamingConfiguration]).detectConfiguration(typesafe) + return setupStreaming(streamingConfiguration) + } else if( this.typesafe.getString("mode").equals("batch")) { + val batchConfiguration: FlinkBatchConfiguration = + new ComponentConfigurator[FlinkBatchConfiguration](classOf[FlinkBatchConfiguration]).detectConfiguration(typesafe) + return setupBatch(batchConfiguration) + } else { + return false; + } + } + +// def setup(typesafe: Config): Boolean = { +// +// val streamsConfig = StreamsConfigurator.detectConfiguration(typesafe) +// +// this.streamsConfig = streamsConfig +// +// BASELOGGER.info("Streams Config: " + streamsConfig) +// +// setup(streamsConfig) +// } + + def setupStreaming(streamingConfiguration: FlinkStreamingConfiguration): Boolean = { + + BASELOGGER.info("FsStreamingFlinkConfiguration: " + streamingConfiguration) + + this.streamsFlinkConfiguration = streamingConfiguration + + if( streamsFlinkConfiguration == null) return false + + if( streamExecutionEnvironment == null ) + streamExecutionEnvironment = streamEnvironment(streamingConfiguration) + + return false + + } + + def setupBatch(batchConfiguration: FlinkBatchConfiguration): Boolean = { + + BASELOGGER.info("FsBatchFlinkConfiguration: " + batchConfiguration) + + this.streamsFlinkConfiguration = batchConfiguration + + if( streamsFlinkConfiguration == null) return false + + if( executionEnvironment == null ) + executionEnvironment = batchEnvironment(batchConfiguration) + + return true + + } + + def batchEnvironment(config: FlinkBatchConfiguration = new FlinkBatchConfiguration()) : ExecutionEnvironment = { + if (config.getTest == false && config.getLocal == false) { + val env = ExecutionEnvironment.getExecutionEnvironment + return env + } else { + val env = ExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt) + return env + } + } + + def streamEnvironment(config: FlinkStreamingConfiguration = new FlinkStreamingConfiguration()) : StreamExecutionEnvironment = { + if( config.getTest == false && config.getLocal == false) { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + env.setRestartStrategy(RestartStrategies.noRestart()); + + // start a checkpoint every hour + env.enableCheckpointing(config.getCheckpointIntervalMs) + + env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) + + // checkpoints have to complete within five minutes, or are discarded + env.getCheckpointConfig.setCheckpointTimeout(config.getCheckpointTimeoutMs) + + // allow only one checkpoint to be in progress at the same time + env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) + + return env + } + + else return StreamExecutionEnvironment.createLocalEnvironment(config.getParallel.toInt) + } + + def buildReaderPath(configObject: HdfsReaderConfiguration) : String = { + var inPathBuilder : String = "" + if (configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) { + inPathBuilder = configObject.getPath + "/" + configObject.getReaderPath + } + else if (configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) { + inPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getReaderPath + } + else if (configObject.getScheme.toString.equals("s3")) { + inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath + } else { + throw new Exception("scheme not recognized: " + configObject.getScheme) + } + return inPathBuilder + } + + def buildWriterPath(configObject: HdfsWriterConfiguration) : String = { + var outPathBuilder : String = "" + if( configObject.getScheme.equals(HdfsConfiguration.Scheme.FILE)) { + outPathBuilder = configObject.getPath + "/" + configObject.getWriterPath + } + else if( configObject.getScheme.equals(HdfsConfiguration.Scheme.HDFS)) { + outPathBuilder = configObject.getScheme + "://" + configObject.getHost + ":" + configObject.getPort + "/" + configObject.getPath + "/" + configObject.getWriterPath + } + else if( configObject.getScheme.toString.equals("s3")) { + outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath + } else { + throw new Exception("output scheme not recognized: " + configObject.getScheme) + } + return outPathBuilder + } + + def toProviderId(input : String) : String = { + if( input.startsWith("@") ) + return input.substring(1) + if( input.contains(':')) + return input.substring(input.lastIndexOf(':')+1) + else return input + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala new file mode 100644 index 0000000..2ac7d32 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -0,0 +1,149 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.connectors.fs.RollingSink +import org.apache.flink.util.Collector +import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.core.StreamsDatum +import org.apache.streams.jackson.StreamsJacksonMapper +import org.apache.streams.twitter.TwitterFollowingConfiguration +import org.apache.streams.twitter.pojo.Follow +import org.apache.streams.twitter.provider.TwitterFollowingProvider +import org.slf4j.{Logger, LoggerFactory} +import org.apache.flink.api.scala._ +import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration +import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration} + +import scala.collection.JavaConversions._ + +/** + * Created by sblackmon on 4/20/16. + */ +/** + * Created by sblackmon on 3/15/16. + */ +object FlinkTwitterFollowingPipeline extends FlinkBase { + + val STREAMS_ID: String = "FlinkTwitterFollowingPipeline" + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline]) + private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() + + override def main(args: Array[String]) = { + super.main(args) + val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + if( setup(jobConfig) == false ) System.exit(1) + val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig) + val thread = new Thread(pipeline) + thread.start() + thread.join() + } + + def setup(jobConfig: TwitterFollowingPipelineConfiguration): Boolean = { + + LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig) + + if( jobConfig == null ) { + LOGGER.error("jobConfig is null!") + System.err.println("jobConfig is null!") + return false + } + + if( jobConfig.getSource == null ) { + LOGGER.error("jobConfig.getSource is null!") + System.err.println("jobConfig.getSource is null!") + return false + } + + if( jobConfig.getDestination == null ) { + LOGGER.error("jobConfig.getDestination is null!") + System.err.println("jobConfig.getDestination is null!") + return false + } + + if( jobConfig.getTwitter == null ) { + LOGGER.error("jobConfig.getTwitter is null!") + System.err.println("jobConfig.getTwitter is null!") + return false + } + + return true + + } + +} + +class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguration = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable { + + import FlinkTwitterFollowingPipeline._ + + override def run(): Unit = { + + val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setNumberOfExecutionRetries(0) + + val inPath = buildReaderPath(config.getSource) + + val outPath = buildWriterPath(config.getDestination) + + val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs ) + + // these datums contain 'Follow' objects + val followDatums: DataStream[StreamsDatum] = + keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10) + + val follows: DataStream[Follow] = followDatums + .map(datum => datum.getDocument.asInstanceOf[Follow]) + + val jsons: DataStream[String] = follows + .map(follow => { + val MAPPER = StreamsJacksonMapper.getInstance + MAPPER.writeValueAsString(follow) + }) + + if( config.getTest == false ) + jsons.addSink(new RollingSink[String](outPath)).setParallelism(3) + else + jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + .setParallelism(env.getParallelism); + + // if( test == true ) jsons.print(); + + env.execute("FlinkTwitterFollowingPipeline") + } + + class FollowingCollectorFlatMapFunction( + twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator[TwitterFollowingConfiguration](classOf[TwitterFollowingConfiguration]).detectConfiguration(StreamsConfigurator.getConfig.getConfig("twitter")), + flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator[StreamsFlinkConfiguration](classOf[StreamsFlinkConfiguration]).detectConfiguration(StreamsConfigurator.getConfig) + ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable { + + override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { + collectConnections(input, out) + } + + def collectConnections(id : String, out : Collector[StreamsDatum]) = { + val twitProvider: TwitterFollowingProvider = + new TwitterFollowingProvider( + twitterConfiguration.withIdsOnly(true).withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(5000l).asInstanceOf[TwitterFollowingConfiguration] + ) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + var iterator: Iterator[StreamsDatum] = null + do { + Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) + twitProvider.readCurrent().iterator().toList.map(out.collect(_)) + } while( twitProvider.isRunning ) + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala new file mode 100644 index 0000000..f8e221c --- /dev/null +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -0,0 +1,165 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.fasterxml.jackson.databind.ObjectMapper +import com.google.common.util.concurrent.Uninterruptibles +import com.peoplepattern.streams.pdb.pipelines.FlinkStreamingConfiguration +import com.peoplepattern.streams.pdb.flink.{FlinkBase, FlinkUtil} +import com.peoplepattern.streams.pipelines.pdb.TwitterPostsPipelineConfiguration +import com.peoplepattern.streams.twitter.collection.FlinkTwitterPostsPipeline.LOGGER +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.runtime.state.filesystem.FsStateBackend +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment} +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow +import org.apache.flink.streaming.connectors.fs.RollingSink +import org.apache.flink.streaming.util.serialization.SimpleStringSchema +import org.apache.flink.util.Collector +import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration +import org.apache.streams.flink.FlinkStreamingConfiguration +import org.apache.streams.hdfs.HdfsConfiguration +import org.apache.streams.jackson.StreamsJacksonMapper +import org.apache.streams.twitter.TwitterUserInformationConfiguration +import org.apache.streams.twitter.pojo.{Tweet, User} +import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider} +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConversions._ + +/** + * Created by sblackmon on 7/29/15. + */ +object FlinkTwitterPostsPipeline extends FlinkBase { + + val STREAMS_ID: String = "FlinkTwitterPostsPipeline" + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipeline]) + private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() + + override def main(args: Array[String]) = { + super.main(args) + val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe) + if( setup(jobConfig) == false ) System.exit(1) + val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig) + val thread = new Thread(pipeline) + thread.start() + thread.join() + } + + def setup(jobConfig: TwitterPostsPipelineConfiguration): Boolean = { + + LOGGER.info("TwitterPostsPipelineConfiguration: " + jobConfig) + + if( jobConfig == null ) { + LOGGER.error("jobConfig is null!") + System.err.println("jobConfig is null!") + return false + } + + if( jobConfig.getSource == null ) { + LOGGER.error("jobConfig.getSource is null!") + System.err.println("jobConfig.getSource is null!") + return false + } + + if( jobConfig.getDestination == null ) { + LOGGER.error("jobConfig.getDestination is null!") + System.err.println("jobConfig.getDestination is null!") + return false + } + + if( jobConfig.getTwitter == null ) { + LOGGER.error("jobConfig.getTwitter is null!") + System.err.println("jobConfig.getTwitter is null!") + return false + } + + return true + + } + +} + +class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable { + + import FlinkTwitterPostsPipeline._ + + override def run(): Unit = { + + val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setNumberOfExecutionRetries(0) + + val inPath = buildReaderPath(config.getSource) + + val outPath = buildWriterPath(config.getDestination) + + //val inProps = buildKafkaProps(config.getSourceTopic) + + val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids") + + //val idTopicIn = new KafkaSink() + +// val idTopicOut : DataStream[String] = env.addSource[String]( +// new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09(config.getSourceTopic.getTopic, new SimpleStringSchema(), +// inProps)); + + val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) + + // these datums contain 'Tweet' objects + val tweetDatums: DataStream[StreamsDatum] = + keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums") + + val tweets: DataStream[Tweet] = tweetDatums + .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets") + + val jsons: DataStream[String] = tweets + .map(tweet => { + val MAPPER = StreamsJacksonMapper.getInstance + MAPPER.writeValueAsString(tweet) + }).name("json") + + if( config.getTest == false ) + jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") + else + jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + .setParallelism(env.getParallelism); + + // if( test == true ) jsons.print(); + + env.execute("FlinkTwitterPostsPipeline") + } + + class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable { + override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { + collectPosts(input, out) + } + def collectPosts(id : String, out : Collector[StreamsDatum]) = { + val twitterConfiguration = config.getTwitter + val twitProvider: TwitterTimelineProvider = + new TwitterTimelineProvider( + twitterConfiguration.withInfo(List(FlinkUtil.toProviderId(id))).withMaxItems(200l) + ) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + var iterator: Iterator[StreamsDatum] = null + do { + Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) + twitProvider.readCurrent().iterator().toList.map(out.collect(_)) + } while( twitProvider.isRunning ) + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala new file mode 100644 index 0000000..a081c74 --- /dev/null +++ b/flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -0,0 +1,163 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.lang +import java.util.concurrent.TimeUnit + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners.{GlobalWindows, TumblingEventTimeWindows} + +import scala.collection.JavaConversions._ +import com.peoplepattern.streams.twitter.collection.FlinkTwitterUserInformationPipeline.LOGGER +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.streams.examples.flink.FlinkBase +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.scala.{AllWindowedStream, DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.triggers._ +import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{ExecutionEnvironment, _} +import org.apache.flink.runtime.state.filesystem.FsStateBackend +import org.apache.flink.streaming.connectors.fs.RollingSink +import org.apache.flink.util.Collector +import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.FlinkBase +import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration +import org.apache.streams.flink.FlinkStreamingConfiguration +import org.apache.streams.hdfs.HdfsConfiguration +import org.apache.streams.jackson.StreamsJacksonMapper +import org.apache.streams.twitter.TwitterUserInformationConfiguration +import org.apache.streams.twitter.pojo.{Tweet, User} +import org.apache.streams.twitter.provider.{TwitterTimelineProvider, TwitterUserInformationProvider} +import org.slf4j.{Logger, LoggerFactory} + +/** + * Created by sblackmon on 3/15/16. + */ +object FlinkTwitterUserInformationPipeline extends FlinkBase { + + val STREAMS_ID: String = "FlinkTwitterUserInformationPipeline" + + private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipeline]) + private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() + + override def main(args: Array[String]) = { + super.main(args) + val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe) + if( setup(jobConfig) == false ) System.exit(1) + val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig) + val thread = new Thread(pipeline) + thread.start() + thread.join() + } + + def setup(jobConfig: TwitterUserInformationPipelineConfiguration): Boolean = { + + LOGGER.info("TwitterFollowingPipelineConfiguration: " + jobConfig) + + if( jobConfig == null ) { + LOGGER.error("jobConfig is null!") + System.err.println("jobConfig is null!") + return false + } + + if( jobConfig.getSource == null ) { + LOGGER.error("jobConfig.getSource is null!") + System.err.println("jobConfig.getSource is null!") + return false + } + + if( jobConfig.getDestination == null ) { + LOGGER.error("jobConfig.getDestination is null!") + System.err.println("jobConfig.getDestination is null!") + return false + } + + if( jobConfig.getTwitter == null ) { + LOGGER.error("jobConfig.getTwitter is null!") + System.err.println("jobConfig.getTwitter is null!") + return false + } + + return true + + } + +} + +class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(StreamsConfigurator.getConfig)) extends Runnable with java.io.Serializable { + + import FlinkTwitterUserInformationPipeline._ + + override def run(): Unit = { + + val env: StreamExecutionEnvironment = streamEnvironment(MAPPER.convertValue(config, classOf[FlinkStreamingConfiguration])) + + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + env.setNumberOfExecutionRetries(0) + + val inPath = buildReaderPath(config.getSource) + + val outPath = buildWriterPath(config.getDestination) + + val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids") + + val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) + + val idWindows: WindowedStream[String, Int, GlobalWindow] = keyed_ids.countWindow(100) + + val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") + + val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums") + + val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users") + + val jsons: DataStream[String] = user + .map(user => { + val MAPPER = StreamsJacksonMapper.getInstance + MAPPER.writeValueAsString(user) + }).name("jsons") + + if( config.getTest == false ) + jsons.addSink(new RollingSink[String](outPath)).setParallelism(3).name("hdfs") + else + jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + .setParallelism(env.getParallelism); + + LOGGER.info("StreamExecutionEnvironment: {}", env.toString ) + + env.execute("FlinkTwitterUserInformationPipeline") + } + + class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { + override def apply(key: Int, window: GlobalWindow, input: Iterable[String], out: Collector[List[String]]): Unit = {if( input.size > 0 ) + out.collect(input.map(id => FlinkUtil.toProviderId(id)).toList) + } + } + + class profileCollectorFlatMapFunction extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { + override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { + collectProfiles(input, out) + } + def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = { + val twitterConfiguration = config.getTwitter + val twitProvider: TwitterUserInformationProvider = + new TwitterUserInformationProvider( + twitterConfiguration.withInfo(ids) + ) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + var iterator: Iterator[StreamsDatum] = null + do { + Uninterruptibles.sleepUninterruptibly(config.getProviderWaitMs, TimeUnit.MILLISECONDS) + twitProvider.readCurrent().iterator().toList.map(out.collect(_)) + } while( twitProvider.isRunning ) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md new file mode 100644 index 0000000..22f30f5 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterFollowingPipeline.md @@ -0,0 +1,41 @@ +FlinkTwitterFollowingPipeline +============================= + +Description: +----------------- + +Collects twitter friends or followers with flink. + +Specification: +----------------- + +[FlinkTwitterFollowingPipeline.dot](FlinkTwitterFollowingPipeline.dot "FlinkTwitterFollowingPipeline.dot" ) + +Diagram: +----------------- + + + +Example Configuration: +---------------------- + +[FlinkTwitterFollowingPipeline.json](FlinkTwitterFollowingPipeline.json "FlinkTwitterFollowingPipeline.json" ) + +Run (Local): +------------ + + java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline + +Run (Flink): +------------ + + flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> + +Run (YARN): +----------- + + flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline http://<location_of_config_file> + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md new file mode 100644 index 0000000..5f77994 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterPostsPipeline.md @@ -0,0 +1,41 @@ +FlinkTwitterPostsPipeline +========================= + +Description: +----------------- + +Collects twitter posts with flink. + +Specification: +----------------- + +[FlinkTwitterPostsPipeline.dot](FlinkTwitterPostsPipeline.dot "FlinkTwitterPostsPipeline.dot" ) + +Diagram: +----------------- + + + +Example Configuration: +---------------------- + +[FlinkTwitterPostsPipeline.json](FlinkTwitterPostsPipeline.json "FlinkTwitterPostsPipeline.json" ) + +Run (Local): +------------ + + java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline + +Run (Flink): +------------ + + flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> + +Run (YARN): +----------- + + flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterPostsPipeline http://<location_of_config_file> + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md new file mode 100644 index 0000000..5e0d1fe --- /dev/null +++ b/flink/flink-twitter-collection/src/site/markdown/FlinkTwitterUserInformationPipeline.md @@ -0,0 +1,41 @@ +FlinkTwitterUserInformationPipeline +=================================== + +Description: +----------------- + +Collects twitter users with flink. + +Specification: +----------------- + +[FlinkTwitterUserInformationPipeline.dot](FlinkTwitterUserInformationPipeline.dot "FlinkTwitterUserInformationPipeline.dot" ) + +Diagram: +----------------- + + + +Example Configuration: +---------------------- + +[FlinkTwitterUserInformationPipeline.json](FlinkTwitterUserInformationPipeline.json "FlinkTwitterUserInformationPipeline.json" ) + +Run (Local): +------------ + + java -cp dist/flink-twitter-collection-jar-with-dependencies.jar -Dconfig.file=file://<location_of_config_file> org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline + +Run (Flink): +------------ + + flink-run.sh dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> + +Run (YARN): +----------- + + flink-run.sh yarn dist/flink-twitter-collection-jar-with-dependencies.jar org.apache.streams.examples.flink.twitter.collection.FlinkTwitterUserInformationPipeline http://<location_of_config_file> + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/markdown/index.md ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/markdown/index.md b/flink/flink-twitter-collection/src/site/markdown/index.md new file mode 100644 index 0000000..19e44cf --- /dev/null +++ b/flink/flink-twitter-collection/src/site/markdown/index.md @@ -0,0 +1,32 @@ +Apache Streams (incubating) +Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 +-------------------------------------------------------------------------------- + +flink-twitter-collection +======================== + +Requirements: +------------- + - Authorized Twitter API credentials + +Description: +------------ +Collects large batches of documents from api.twitter.com from a seed set of ids. + +Streams: +-------- + +<a href="FlinkTwitterUserInformationPipeline.html" target="_self">FlinkTwitterUserInformationPipeline</a> + +<a href="FlinkTwitterPostsPipeline.html" target="_self">FlinkTwitterPostsPipeline</a> + +<a href="FlinkTwitterFollowingPipeline.html" target="_self">FlinkTwitterFollowingPipeline</a> + +Build: +--------- + + mvn clean install verify + +[JavaDocs](apidocs/index.html "JavaDocs") + +###### Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0 http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json new file mode 100644 index 0000000..30a2942 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkBatchConfiguration.json @@ -0,0 +1,12 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.FlinkBatchConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "StreamsFlinkConfiguration.json" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json new file mode 100644 index 0000000..0d63f4e --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/FlinkStreamingConfiguration.json @@ -0,0 +1,40 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.FlinkStreamingConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "StreamsFlinkConfiguration.json" + }, + "properties": { + "parallel": { + "type": "integer", + "default": 1 + }, + "providerWaitMs": { + "type": "integer", + "default": 1000 + }, + "checkpointIntervalMs": { + "type": "integer", + "default": 300000 + }, + "checkpointTimeoutMs": { + "type": "integer", + "default": 30000 + }, + "restartAttempts": { + "type": "integer", + "description": "number of restart attempts", + "default": 3 + }, + "restartDelayMs": { + "type": "integer", + "description": "delay in milliseconds", + "default": 10000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json new file mode 100644 index 0000000..ef78357 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/StreamsFlinkConfiguration.json @@ -0,0 +1,48 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.flink.StreamsFlinkConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "http://streams.peoplepattern.com/incubator-streams/0.3.9-PP-SNAPSHOT/streams-config/StreamsConfiguration.json" + }, + "properties": { + "parallel": { + "type": "integer", + "default": 1 + }, + "providerWaitMs": { + "type": "integer", + "default": 1000 + }, + "checkpointIntervalMs": { + "type": "integer", + "default": 300000 + }, + "checkpointTimeoutMs": { + "type": "integer", + "default": 30000 + }, + "test": { + "type": "boolean", + "default": false + }, + "local": { + "type": "boolean", + "default": true + }, + "restartAttempts": { + "type": "integer", + "description": "number of restart attempts", + "default": 3 + }, + "restartDelayMs": { + "type": "integer", + "description": "delay in milliseconds", + "default": 10000 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json new file mode 100644 index 0000000..33afb29 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingBatchConfiguration.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterFollowingBatchConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration" + }, + "hdfs": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "s3": { + "type": "object", + "javaType": "org.apache.streams.s3.S3WriterConfiguration" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json new file mode 100644 index 0000000..de4f9bb --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterFollowingPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterFollowingConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json new file mode 100644 index 0000000..376bb4d --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsBatchConfiguration.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterPostsBatchConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "hdfs": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "s3": { + "type": "object", + "javaType": "org.apache.streams.s3.S3WriterConfiguration" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json new file mode 100644 index 0000000..628d7ee --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterPostsPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json new file mode 100644 index 0000000..55f9fbd --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationBatchConfiguration.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "com.peoplepattern.streams.pipelines.pdb.TwitterUserInformationBatchConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "hdfs": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "s3": { + "type": "object", + "javaType": "org.apache.streams.s3.S3WriterConfiguration" + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams-examples/blob/4491cfe1/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json ---------------------------------------------------------------------- diff --git a/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json new file mode 100644 index 0000000..5261748 --- /dev/null +++ b/flink/flink-twitter-collection/src/site/resources/TwitterUserInformationPipelineConfiguration.json @@ -0,0 +1,29 @@ +{ + "$schema": "http://json-schema.org/draft-03/schema", + "$license": [ + "http://www.apache.org/licenses/LICENSE-2.0" + ], + "type": "object", + "javaType" : "org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration", + "javaInterfaces": ["java.io.Serializable"], + "extends": { + "$ref": "FlinkStreamingConfiguration.json" + }, + "properties": { + "twitter": { + "type": "object", + "javaType": "org.apache.streams.twitter.TwitterUserInformationConfiguration" + }, + "source": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsReaderConfiguration" + }, + "destination": { + "type": "object", + "javaType": "org.apache.streams.hdfs.HdfsWriterConfiguration" + }, + "providerWaitMs": { + "type": "integer" + } + } +} \ No newline at end of file
