This is an automated email from the ASF dual-hosted git repository. sblackmon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/streams.git
commit c8f58e2ad23c33092337357854443f1bfeef2606 Author: Steve Blackmon <[email protected]> AuthorDate: Tue Aug 4 14:25:50 2020 -0500 STREAMS-668 upgrade flink examples to latest flink version STREAMS-668 upgrade flink examples to latest flink version 1.11.1 also improved threading throughout streams-provider-twitter added accumulators to twitter flink pipelines --- .../twitter/provider/TwitterFollowingProvider.java | 4 +- .../twitter/provider/TwitterTimelineProvider.java | 31 +++-- .../provider/TwitterUserInformationProvider.java | 127 +++++++++++++-------- .../TwitterUserInformationProviderTask.java | 29 ++++- .../flink-twitter-collection/pom.xml | 26 +++-- .../apache/streams/examples/flink/FlinkBase.scala | 13 ++- .../collection/FlinkTwitterFollowingPipeline.scala | 103 ++++++++--------- .../collection/FlinkTwitterPostsPipeline.scala | 63 +++++++--- .../collection/FlinkTwitterSpritzerPipeline.scala | 58 ++++++---- .../FlinkTwitterUserInformationPipeline.scala | 62 +++++++--- .../FollowingCollectorFlatMapFunction.scala | 52 +++++++++ .../TimelineCollectorFlatMapFunction.scala | 52 +++++++++ .../UserInformationCollectorFlatMapFunction.scala | 48 ++++++++ .../FlinkTwitterFollowingPipelineFollowersIT.conf | 3 +- .../FlinkTwitterFollowingPipelineFriendsIT.conf | 1 + .../resources/FlinkTwitterPostsPipelineIT.conf | 1 + .../FlinkTwitterUserInformationPipelineIT.conf | 9 +- .../src/test/resources/testng.xml | 54 +++++++++ .../FlinkTwitterFollowingPipelineFollowersIT.scala | 2 +- .../FlinkTwitterFollowingPipelineFriendsIT.scala | 4 +- .../twitter/test/FlinkTwitterPostsPipelineIT.scala | 4 +- .../FlinkTwitterUserInformationPipelineIT.scala | 4 +- 22 files changed, 562 insertions(+), 188 deletions(-) diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java index 4603803..eb472ea 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java @@ -332,6 +332,8 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str } public boolean isRunning() { + LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty()); + LOGGER.debug("providerQueue.size: {}", providerQueue.size()); LOGGER.debug("executor.isShutdown: {}", executor.isShutdown()); LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); LOGGER.debug("tasks.size(): {}", tasks.size()); @@ -346,7 +348,7 @@ public class TwitterFollowingProvider implements Callable<Iterator<Follow>>, Str allTasksComplete = false; } LOGGER.debug("allTasksComplete: {}", allTasksComplete); - boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated(); + boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0; LOGGER.debug("finished: {}", finished); if ( finished ) { running.set(false); diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java index eee81c4..126683c 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java @@ -206,6 +206,7 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea } Objects.requireNonNull(providerQueue); + Objects.requireNonNull(config); Objects.requireNonNull(config.getOauth().getConsumerKey()); Objects.requireNonNull(config.getOauth().getConsumerSecret()); Objects.requireNonNull(config.getOauth().getAccessToken()); @@ -244,6 +245,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea ) ); + Objects.requireNonNull(executor); + submitTimelineThreads(ids, names); LOGGER.info("tasks: {}", tasks.size()); @@ -323,6 +326,8 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea running.set(false); LOGGER.info("Exiting"); + } else { + LOGGER.info("Not Finished Yet..."); } return result; @@ -350,25 +355,37 @@ public class TwitterTimelineProvider implements Callable<Iterator<Tweet>>, Strea } @Override - public void cleanUp() { - ExecutorUtils.shutdownAndAwaitTermination(executor); - } - - @Override public boolean isRunning() { LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty()); LOGGER.debug("providerQueue.size: {}", providerQueue.size()); + LOGGER.debug("executor.isShutdown: {}", executor.isShutdown()); LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); LOGGER.debug("tasks.size(): {}", tasks.size()); LOGGER.debug("futures.size(): {}", futures.size()); - if ( tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() ) { + boolean allTasksComplete; + if( futures.size() > 0) { + allTasksComplete = true; + for(Future<?> future : futures){ + allTasksComplete |= !future.isDone(); // check if future is done + } + } else { + allTasksComplete = false; + } + LOGGER.debug("allTasksComplete: {}", allTasksComplete); + boolean finished = tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated() && allTasksComplete && providerQueue.size() == 0; + LOGGER.debug("finished: {}", finished); + if ( finished ) { running.set(false); } - LOGGER.debug("isRunning: ", running.get()); return running.get(); } @Override + public void cleanUp() { + ExecutorUtils.shutdownAndAwaitTermination(executor); + } + + @Override public Iterator<Tweet> call() { prepare(config); startStream(); diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java index 6ceeb88..dc0d050 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java @@ -40,6 +40,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; + import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; @@ -56,6 +57,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.io.Serializable; +import java.lang.Runnable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; @@ -64,6 +66,8 @@ import java.util.List; import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; @@ -103,14 +107,15 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, protected Twitter client; - protected ListeningExecutorService executor; + protected ExecutorService executor; protected DateTime start; protected DateTime end; protected final AtomicBoolean running = new AtomicBoolean(); - private List<ListenableFuture<Object>> futures = new ArrayList<>(); + private List<Runnable> tasks = new ArrayList<>(); + private List<Future<Object>> futures = new ArrayList<>(); /** * To use from command line: @@ -165,10 +170,11 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, System.err.println(ex.getMessage()); } } + outStream.flush(); } while ( provider.isRunning()); provider.cleanUp(); - outStream.flush(); + outStream.close(); } // TODO: this should be abstracted out @@ -199,9 +205,19 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, public void prepare(Object configurationObject) { if ( configurationObject instanceof TwitterFollowingConfiguration ) { - config = (TwitterUserInformationConfiguration) configurationObject; + this.config = (TwitterUserInformationConfiguration) configurationObject; } + streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig()); + + try { + lock.writeLock().lock(); + providerQueue = QueueUtils.constructQueue(); + } finally { + lock.writeLock().unlock(); + } + + Objects.requireNonNull(providerQueue); Objects.requireNonNull(config); Objects.requireNonNull(config.getOauth()); Objects.requireNonNull(config.getOauth().getConsumerKey()); @@ -211,10 +227,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, Objects.requireNonNull(config.getInfo()); Objects.requireNonNull(config.getThreadsPerProvider()); - streamsConfiguration = StreamsConfigurator.detectConfiguration(); - - Objects.requireNonNull(streamsConfiguration.getQueueSize()); - try { client = getTwitterClient(); } catch (InstantiationException e) { @@ -223,15 +235,6 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, Objects.requireNonNull(client); - try { - lock.writeLock().lock(); - providerQueue = QueueUtils.constructQueue(); - } finally { - lock.writeLock().unlock(); - } - - Objects.requireNonNull(providerQueue); - for (String s : config.getInfo()) { if (s != null) { String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase(); @@ -247,7 +250,7 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, } executor = MoreExecutors.listeningDecorator( - ExecutorUtils.newFixedThreadPoolWithQueueSize( + ExecutorUtils.newFixedThreadPoolWithQueueSize( config.getThreadsPerProvider().intValue(), streamsConfiguration.getQueueSize().intValue() ) @@ -255,12 +258,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, Objects.requireNonNull(executor); - // Twitter allows for batches up to 100 per request, but you cannot mix types submitUserInformationThreads(ids, names); + + LOGGER.info("tasks: {}", tasks.size()); + LOGGER.info("futures: {}", futures.size()); + + } + + @Override + public void startStream() { + + Objects.requireNonNull(executor); + + LOGGER.info("startStream"); + + running.set(true); + + LOGGER.info("running: {}", running.get()); + + ExecutorUtils.shutdownAndAwaitTermination(executor); + + LOGGER.info("running: {}", running.get()); + } protected void submitUserInformationThreads(List<Long> ids, List<String> names) { + /* + while( idsIndex < ids.size() ) { + from = idsIndex + to = Math.min( idsIndex + 100, ids.size() - idsIndex + } + */ int idsIndex = 0; while( idsIndex + 100 < ids.size() ) { List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100); @@ -268,7 +297,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, this, client, new UsersLookupRequest().withUserId(batchIds)); - ListenableFuture future = executor.submit(providerTask); + tasks.add(providerTask); + Future future = executor.submit((Callable)providerTask); futures.add(future); LOGGER.info("Thread Submitted: {}", providerTask.request); idsIndex += 100; @@ -279,7 +309,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, this, client, new UsersLookupRequest().withUserId(batchIds)); - ListenableFuture future = executor.submit(providerTask); + tasks.add(providerTask); + Future future = executor.submit((Callable)providerTask); futures.add(future); LOGGER.info("Thread Submitted: {}", providerTask.request); } @@ -291,7 +322,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, this, client, new UsersLookupRequest().withScreenName(batchNames)); - ListenableFuture future = executor.submit(providerTask); + tasks.add(providerTask); + Future future = executor.submit((Callable)providerTask); futures.add(future); LOGGER.info("Thread Submitted: {}", providerTask.request); namesIndex += 100; @@ -302,7 +334,8 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, this, client, new UsersLookupRequest().withScreenName(batchNames)); - ListenableFuture future = executor.submit(providerTask); + tasks.add(providerTask); + Future future = executor.submit((Callable)providerTask); futures.add(future); LOGGER.info("Thread Submitted: {}", providerTask.request); } @@ -310,27 +343,16 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, } @Override - public void startStream() { - - Objects.requireNonNull(executor); - - LOGGER.info("startStream: {} Threads", futures.size()); - - running.set(true); - - executor.shutdown(); - } - - @Override public StreamsResultSet readCurrent() { StreamsResultSet result; + LOGGER.debug("Providing {} docs", providerQueue.size()); + try { lock.writeLock().lock(); result = new StreamsResultSet(providerQueue); providerQueue = QueueUtils.constructQueue(); - LOGGER.debug("readCurrent: {} Documents", result.size()); } finally { lock.writeLock().unlock(); } @@ -347,27 +369,38 @@ public class TwitterUserInformationProvider implements Callable<Iterator<User>>, @Override public StreamsResultSet readRange(DateTime start, DateTime end) { LOGGER.debug("{} readRange", STREAMS_ID); - this.start = start; - this.end = end; - readCurrent(); - return (StreamsResultSet)providerQueue.iterator(); + throw new NotImplementedException(); } + protected Twitter getTwitterClient() throws InstantiationException { + return Twitter.getInstance(config); + } @Override public boolean isRunning() { - if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) { - LOGGER.info("All Threads Completed"); + LOGGER.debug("providerQueue.isEmpty: {}", providerQueue.isEmpty()); + LOGGER.debug("providerQueue.size: {}", providerQueue.size()); + LOGGER.debug("executor.isTerminated: {}", executor.isTerminated()); + LOGGER.debug("tasks.size(): {}", tasks.size()); + LOGGER.debug("futures.size(): {}", futures.size()); + boolean allTasksComplete; + if( futures.size() > 0) { + allTasksComplete = true; + for(Future<?> future : futures){ + allTasksComplete |= !future.isDone(); // check if future is done + } + } else { + allTasksComplete = false; + } + LOGGER.debug("allTasksComplete: {}", allTasksComplete); + boolean finished = allTasksComplete && tasks.size() > 0 && tasks.size() == futures.size() && executor.isShutdown() && executor.isTerminated(); + LOGGER.debug("finished: {}", finished); + if ( finished ) { running.set(false); - LOGGER.info("Exiting"); } return running.get(); } - protected Twitter getTwitterClient() throws InstantiationException { - return Twitter.getInstance(config); - } - @Override public void cleanUp() { ExecutorUtils.shutdownAndAwaitTermination(executor); diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java index 418ba83..40ebfcf 100644 --- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java +++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java @@ -23,6 +23,7 @@ import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.twitter.api.Twitter; import org.apache.streams.twitter.api.UsersLookupRequest; import org.apache.streams.twitter.converter.TwitterDateTimeFormat; +import org.apache.streams.twitter.pojo.Tweet; import org.apache.streams.twitter.pojo.User; import org.apache.streams.util.ComponentUtils; @@ -31,14 +32,17 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.Callable; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Retrieve recent posts for a single user id. */ -public class TwitterUserInformationProviderTask implements Runnable { +public class TwitterUserInformationProviderTask implements Callable<Iterator<User>>, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class); @@ -47,6 +51,7 @@ public class TwitterUserInformationProviderTask implements Runnable { protected TwitterUserInformationProvider provider; protected Twitter client; protected UsersLookupRequest request; + protected List<User> responseList; /** * TwitterTimelineProviderTask constructor. @@ -60,19 +65,37 @@ public class TwitterUserInformationProviderTask implements Runnable { this.request = request; } + int item_count = 0; + @Override public void run() { LOGGER.info("Thread Starting: {}", request.toString()); + responseList = new ArrayList<>(); + List<User> users = client.lookup(request); - for (User user : users) { - ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue); + responseList.addAll(users); + + int item_count = 0; + + if( users.size() > 0 ) { + for (User user : users) { + ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue); + LOGGER.debug("User: {}", user.getIdStr()); + } } LOGGER.info("Thread Finished: {}", request.toString()); + LOGGER.info("item_count: {} ", item_count); + } + @Override + public Iterator<User> call() throws Exception { + run(); + return responseList.iterator(); + } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml index f39ae0a..1e2de10 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/pom.xml @@ -34,7 +34,7 @@ <properties> <testng.version>6.9.10</testng.version> <hdfs.version>2.7.0</hdfs.version> - <flink.version>1.4.2</flink.version> + <flink.version>1.11.1</flink.version> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <scalapc.version>1.1.0</scalapc.version> @@ -127,6 +127,18 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-httpfs</artifactId> + <type>war</type> + <version>${hdfs.version}</version> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> @@ -433,15 +445,9 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-failsafe-plugin</artifactId> <configuration> - <!-- Run integration test suite rather than individual tests. --> - <excludes> - <exclude>**/*Test.java</exclude> - <exclude>**/*Tests.java</exclude> - </excludes> - <includes> - <include>**/*IT.java</include> - <include>**/*ITs.java</include> - </includes> + <suiteXmlFiles> + <suiteXmlFile>target/test-classes/testng.xml</suiteXmlFile> + </suiteXmlFiles> </configuration> <dependencies> <dependency> diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala index 1709d4c..3acca05 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/FlinkBase.scala @@ -66,7 +66,7 @@ trait FlinkBase { if(StringUtils.isNotEmpty(configUrl)) { BASELOGGER.info("StreamsConfigurator.resolveConfig(configUrl): {}", StreamsConfigurator.resolveConfig(configUrl)) try { - typesafe = StreamsConfigurator.resolveConfig(configUrl).withFallback(StreamsConfigurator.getConfig).resolve() + typesafe = StreamsConfigurator.resolveConfig(configUrl) } catch { case mue: MalformedURLException => { BASELOGGER.error("Invalid Configuration URL: ", mue) @@ -77,6 +77,7 @@ trait FlinkBase { return false } } + StreamsConfigurator.addConfig(typesafe) } else { typesafe = StreamsConfigurator.getConfig @@ -177,9 +178,10 @@ trait FlinkBase { } else if (configObject.getScheme.toString.equals("s3")) { inPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getReaderPath - } else { - throw new Exception("scheme not recognized: " + configObject.getScheme) } +// else { +// throw new Exception("scheme not recognized: " + configObject.getScheme) +// } inPathBuilder } @@ -193,9 +195,10 @@ trait FlinkBase { } else if( configObject.getScheme.toString.equals("s3")) { outPathBuilder = configObject.getScheme + "://" + configObject.getPath + "/" + configObject.getWriterPath - } else { - throw new Exception("output scheme not recognized: " + configObject.getScheme) } +// else { +// throw new Exception("output scheme not recognized: " + configObject.getScheme) +// } outPathBuilder } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala index bbf83f8..bae988c 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterFollowingPipeline.scala @@ -19,32 +19,28 @@ package org.apache.streams.examples.flink.twitter.collection import java.util.Objects -import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper -import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils -import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink -import org.apache.flink.util.Collector -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.KeyedStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterFollowingPipelineConfiguration -import org.apache.streams.flink.{FlinkStreamingConfiguration, StreamsFlinkConfiguration} -import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration} import org.apache.streams.jackson.StreamsJacksonMapper -import org.apache.streams.twitter.config.TwitterFollowingConfiguration import org.apache.streams.twitter.pojo.Follow -import org.apache.streams.twitter.provider.TwitterFollowingProvider import org.hamcrest.MatcherAssert -import org.slf4j.{Logger, LoggerFactory} - -import scala.collection.JavaConversions._ +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** * FlinkTwitterFollowingPipeline collects friends or followers of all profiles from a @@ -58,8 +54,12 @@ object FlinkTwitterFollowingPipeline extends FlinkBase { private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() override def main(args: Array[String]) = { - super.main(args) - val jobConfig = new ComponentConfigurator[TwitterFollowingPipelineConfiguration](classOf[TwitterFollowingPipelineConfiguration]).detectConfiguration(typesafe) + if( args.length > 0 ) { + LOGGER.info("Args: {}", args) + configUrl = args(0) + } + if( !setup(configUrl) ) System.exit(1) + val jobConfig = new StreamsConfigurator(classOf[TwitterFollowingPipelineConfiguration]).detectCustomConfiguration() if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterFollowingPipeline = new FlinkTwitterFollowingPipeline(jobConfig) val thread = new Thread(pipeline) @@ -119,6 +119,7 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio val env: StreamExecutionEnvironment = streamEnvironment(config) + env.setParallelism(streamsConfig.getParallelism().toInt) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) @@ -126,54 +127,54 @@ class FlinkTwitterFollowingPipeline(config: TwitterFollowingPipelineConfiguratio val outPath = buildWriterPath(config.getDestination) - val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).keyBy( id => (id.hashCode % 100).abs ) + val ids: DataStream[String] = env.readTextFile(inPath) + + val keyed_ids: KeyedStream[String, Int] = ids. + name("keyed_ids"). + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) // these datums contain 'Follow' objects - val followDatums: DataStream[StreamsDatum] = - keyed_ids.flatMap(new FollowingCollectorFlatMapFunction(config.getTwitter)).setParallelism(10) + val followDatums: DataStream[StreamsDatum] = keyed_ids. + flatMap(new FollowingCollectorFlatMapFunction(streamsConfig, config.getTwitter, streamsFlinkConfiguration)). + name("followDatums"). + setParallelism(streamsConfig.getParallelism().toInt) - val follows: DataStream[Follow] = followDatums + val follows: DataStream[Follow] = followDatums. + name("follows") .map(datum => datum.getDocument.asInstanceOf[Follow]) - val jsons: DataStream[String] = follows + val jsons: DataStream[String] = follows. + name("jsons") .map(follow => { val MAPPER = StreamsJacksonMapper.getInstance MAPPER.writeValueAsString(follow) - }) - - if( config.getTest == false ) - jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3) - else - jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism) + }). + setParallelism(streamsConfig.getParallelism().toInt) + + val keyed_jsons: KeyedStream[String, Int] = jsons. + name("keyed_jsons"). + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) + + val fileSink : StreamingFileSink[String] = StreamingFileSink. + forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). + build() + + if( config.getTest == true ) { + keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + } else { + keyed_jsons.name("fileSink").addSink(fileSink) + } - // if( test == true ) jsons.print(); + val result: JobExecutionResult = env.execute("FlinkTwitterFollowingPipeline") - env.execute(STREAMS_ID) - } + LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult) - class FollowingCollectorFlatMapFunction( - twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(), - flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration() - ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable { + LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime()) - override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { - collectConnections(input, out) - } + LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults())) - def collectConnections(id : String, out : Collector[StreamsDatum]) = { - val twitProvider: TwitterFollowingProvider = - new TwitterFollowingProvider( - twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration] - ) - twitProvider.prepare(twitProvider) - twitProvider.startStream() - var iterator: Iterator[StreamsDatum] = null - do { - Uninterruptibles.sleepUninterruptibly(flinkConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) - twitProvider.readCurrent().iterator().toList.map(out.collect(_)) - } while( twitProvider.isRunning ) - } } } \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala index 28d2576..14eebc5 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterPostsPipeline.scala @@ -24,24 +24,31 @@ import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils +import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment} -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.KeyedStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.config.ComponentConfigurator +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterPostsPipelineConfiguration -import org.apache.streams.flink.FlinkStreamingConfiguration -import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration} +import org.apache.streams.hdfs.HdfsReaderConfiguration +import org.apache.streams.hdfs.HdfsWriterConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.pojo.Tweet import org.apache.streams.twitter.provider.TwitterTimelineProvider import org.hamcrest.MatcherAssert -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ @@ -57,8 +64,12 @@ object FlinkTwitterPostsPipeline extends FlinkBase { private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() override def main(args: Array[String]) = { - super.main(args) - val jobConfig = new ComponentConfigurator[TwitterPostsPipelineConfiguration](classOf[TwitterPostsPipelineConfiguration]).detectConfiguration(typesafe) + if( args.length > 0 ) { + LOGGER.info("Args: {}", args) + configUrl = args(0) + } + if( !setup(configUrl) ) System.exit(1) + val jobConfig = new StreamsConfigurator(classOf[TwitterPostsPipelineConfiguration]).detectCustomConfiguration() if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterPostsPipeline = new FlinkTwitterPostsPipeline(jobConfig) val thread = new Thread(pipeline) @@ -118,6 +129,7 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val env: StreamExecutionEnvironment = streamEnvironment(config) + env.setParallelism(streamsConfig.getParallelism().toInt) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) @@ -125,13 +137,15 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration()) - val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids") + val ids: DataStream[String] = env.readTextFile(inPath).name("ids") - val keyed_ids: KeyedStream[String, Int] = env.readTextFile(inPath).setParallelism(10).name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) + val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids"). + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) // these datums contain 'Tweet' objects val tweetDatums: DataStream[StreamsDatum] = - keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(10).name("tweetDatums") + keyed_ids.flatMap(new postCollectorFlatMapFunction).setParallelism(env.getParallelism).name("tweetDatums") val tweets: DataStream[Tweet] = tweetDatums .map(datum => datum.getDocument.asInstanceOf[Tweet]).name("tweets") @@ -142,15 +156,28 @@ class FlinkTwitterPostsPipeline(config: TwitterPostsPipelineConfiguration = new MAPPER.writeValueAsString(tweet) }).name("json") - if( config.getTest == false ) - jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs") - else - jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism) + val keyed_jsons: KeyedStream[String, Int] = jsons. + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) + + val fileSink : StreamingFileSink[String] = StreamingFileSink. + forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). + build() + + if( config.getTest == true ) { + keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + } else { + keyed_jsons.addSink(fileSink) + } + + val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline") + + LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult) + + LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime()) - // if( test == true ) jsons.print(); + LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults())) - env.execute(STREAMS_ID) } class postCollectorFlatMapFunction extends RichFlatMapFunction[String, StreamsDatum] with Serializable { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala index 79c95d6..6e38591 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterSpritzerPipeline.scala @@ -25,26 +25,32 @@ import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils -import org.apache.flink.api.common.functions.StoppableFunction +import org.apache.flink.api.common.JobExecutionResult +import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.functions.source.RichSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.KeyedStream +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.streams.config.ComponentConfigurator +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterSpritzerPipelineConfiguration -import org.apache.streams.flink.FlinkStreamingConfiguration import org.apache.streams.hdfs.HdfsWriterConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.config.TwitterStreamConfiguration import org.apache.streams.twitter.converter.TwitterDateTimeFormat import org.apache.streams.twitter.provider.TwitterStreamProvider import org.hamcrest.MatcherAssert -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ @@ -122,25 +128,37 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration()) - val streamSource : DataStream[String] = env.addSource(spritzerSource) + val jsons : DataStream[String] = env.addSource(spritzerSource) - if( config.getTest == false ) - streamSource.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs") - else - streamSource.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism) + val keyed_jsons: KeyedStream[String, Int] = jsons. + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) - // if( test == true ) jsons.print(); + val fileSink : StreamingFileSink[String] = StreamingFileSink. + forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). + build() - env.execute(STREAMS_ID) + if( config.getTest == true ) { + keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + } else { + keyed_jsons.addSink(fileSink) + } + + val result: JobExecutionResult = env.execute("FlinkTwitterPostsPipeline") + + LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult) + + LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime()) + + LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults())) } def stop(): Unit = { - spritzerSource.stop() + spritzerSource.cancel() } - class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable with StoppableFunction { + class SpritzerSource(sourceConfig: TwitterStreamConfiguration) extends RichSourceFunction[String] with Serializable /*with StoppableFunction*/ { var mapper: ObjectMapper = _ @@ -167,9 +185,9 @@ class FlinkTwitterSpritzerPipeline(config: TwitterSpritzerPipelineConfiguration close() } - override def stop(): Unit = { - close() - } +// override def stop(): Unit = { +// close() +// } } diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala index 977fdf5..32cf232 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FlinkTwitterUserInformationPipeline.scala @@ -24,26 +24,34 @@ import java.util.concurrent.TimeUnit import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.util.concurrent.Uninterruptibles import org.apache.commons.lang3.StringUtils +import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.api.scala._ import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.flink.streaming.api.scala.DataStream +import org.apache.flink.streaming.api.scala.KeyedStream +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.scala.WindowedStream import org.apache.flink.streaming.api.scala.function.WindowFunction -import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream} import org.apache.flink.streaming.api.windowing.windows.GlobalWindow -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink import org.apache.flink.util.Collector -import org.apache.streams.config.{ComponentConfigurator, StreamsConfigurator} +import org.apache.streams.config.ComponentConfigurator +import org.apache.streams.config.StreamsConfigurator import org.apache.streams.core.StreamsDatum import org.apache.streams.examples.flink.FlinkBase import org.apache.streams.examples.flink.twitter.TwitterUserInformationPipelineConfiguration -import org.apache.streams.flink.FlinkStreamingConfiguration -import org.apache.streams.hdfs.{HdfsReaderConfiguration, HdfsWriterConfiguration} +import org.apache.streams.hdfs.HdfsReaderConfiguration +import org.apache.streams.hdfs.HdfsWriterConfiguration import org.apache.streams.jackson.StreamsJacksonMapper import org.apache.streams.twitter.pojo.User import org.apache.streams.twitter.provider.TwitterUserInformationProvider import org.hamcrest.MatcherAssert -import org.slf4j.{Logger, LoggerFactory} +import org.slf4j.Logger +import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ @@ -59,8 +67,12 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase { private val MAPPER: ObjectMapper = StreamsJacksonMapper.getInstance() override def main(args: Array[String]) = { - super.main(args) - val jobConfig = new ComponentConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectConfiguration(typesafe) + if( args.length > 0 ) { + LOGGER.info("Args: {}", args) + configUrl = args(0) + } + if( !setup(configUrl) ) System.exit(1) + val jobConfig = new StreamsConfigurator(classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration() if( !setup(jobConfig) ) System.exit(1) val pipeline: FlinkTwitterUserInformationPipeline = new FlinkTwitterUserInformationPipeline(jobConfig) val thread = new Thread(pipeline) @@ -115,11 +127,13 @@ object FlinkTwitterUserInformationPipeline extends FlinkBase { class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipelineConfiguration = new StreamsConfigurator[TwitterUserInformationPipelineConfiguration](classOf[TwitterUserInformationPipelineConfiguration]).detectCustomConfiguration()) extends Runnable with java.io.Serializable { import FlinkTwitterUserInformationPipeline._ + //import FlinkBase.streamsConfig override def run(): Unit = { val env: StreamExecutionEnvironment = streamEnvironment(config) + env.setParallelism(streamsConfig.getParallelism().toInt) env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) env.setNumberOfExecutionRetries(0) @@ -127,7 +141,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val outPath = buildWriterPath(new ComponentConfigurator(classOf[HdfsWriterConfiguration]).detectConfiguration()) - val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(10).name("ids") + val ids: DataStream[String] = env.readTextFile(inPath).setParallelism(env.getParallelism).name("ids") val keyed_ids: KeyedStream[String, Int] = ids.name("keyed_ids").keyBy( id => (id.hashCode % 100).abs ) @@ -135,7 +149,7 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline val idLists: DataStream[List[String]] = idWindows.apply[List[String]] (new idListWindowFunction()).name("idLists") - val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(10).name("userDatums") + val userDatums: DataStream[StreamsDatum] = idLists.flatMap(new profileCollectorFlatMapFunction).setParallelism(env.getParallelism).name("userDatums") val user: DataStream[User] = userDatums.map(datum => datum.getDocument.asInstanceOf[User]).name("users") @@ -145,15 +159,29 @@ class FlinkTwitterUserInformationPipeline(config: TwitterUserInformationPipeline MAPPER.writeValueAsString(user) }).name("jsons") - if( config.getTest == false ) - jsons.addSink(new BucketingSink[String](outPath)).setParallelism(3).name("hdfs") - else - jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) - .setParallelism(env.getParallelism) + val keyed_jsons: KeyedStream[String, Int] = jsons. + setParallelism(streamsConfig.getParallelism().toInt). + keyBy( id => (id.hashCode % streamsConfig.getParallelism().toInt).abs ) + + val fileSink : StreamingFileSink[String] = StreamingFileSink. + forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8")). + build() + + if( config.getTest == true ) { + keyed_jsons.writeAsText(outPath,FileSystem.WriteMode.OVERWRITE) + } else { + keyed_jsons.addSink(fileSink) + } + + val result: JobExecutionResult = env.execute("FlinkTwitterUserInformationPipeline") + + LOGGER.info("JobExecutionResult: {}", result.getJobExecutionResult) + + LOGGER.info("JobExecutionResult.getNetRuntime: {}", result.getNetRuntime()) + + LOGGER.info("JobExecutionResult.getAllAccumulatorResults: {}", MAPPER.writeValueAsString(result.getAllAccumulatorResults())) - LOGGER.info("StreamExecutionEnvironment: {}", env.toString ) - env.execute(STREAMS_ID) } class idListWindowFunction extends WindowFunction[String, List[String], Int, GlobalWindow] { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala new file mode 100644 index 0000000..1223047 --- /dev/null +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/FollowingCollectorFlatMapFunction.scala @@ -0,0 +1,52 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.streams.config.ComponentConfigurator +import org.apache.streams.config.StreamsConfiguration +import org.apache.streams.core.StreamsDatum +import org.apache.streams.examples.flink.twitter.collection.FlinkTwitterFollowingPipeline.toProviderId +import org.apache.streams.flink.StreamsFlinkConfiguration +import org.apache.streams.twitter.config.TwitterFollowingConfiguration +import org.apache.streams.twitter.provider.TwitterFollowingProvider + +import scala.collection.JavaConversions._ + +class FollowingCollectorFlatMapFunction( + streamsConfiguration : StreamsConfiguration, + twitterConfiguration : TwitterFollowingConfiguration = new ComponentConfigurator(classOf[TwitterFollowingConfiguration]).detectConfiguration(), + flinkConfiguration : StreamsFlinkConfiguration = new ComponentConfigurator(classOf[StreamsFlinkConfiguration]).detectConfiguration() + ) extends RichFlatMapFunction[String, StreamsDatum] with Serializable { + + var size : IntCounter = new IntCounter() + var counter : IntCounter = new IntCounter() + + override def open(parameters: Configuration): Unit = { + getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.size", this.size) + getRuntimeContext().addAccumulator("FlinkTwitterFollowingPipeline.counter", this.counter) + } + + override def flatMap(input: String, out: Collector[StreamsDatum]): Unit = { + size.add(input.size) + collectConnections(input, out) + } + + def collectConnections(id : String, out : Collector[StreamsDatum]) = { + val conf = twitterConfiguration.withInfo(List(toProviderId(id))).asInstanceOf[TwitterFollowingConfiguration] + val twitProvider: TwitterFollowingProvider = new TwitterFollowingProvider(conf) + twitProvider.prepare(twitProvider) + twitProvider.startStream() + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) + val current = twitProvider.readCurrent().iterator().toList + counter.add(current.size) + current.map(out.collect(_)) + } while( twitProvider.isRunning ) + } + +} diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala new file mode 100644 index 0000000..8f21145 --- /dev/null +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/TimelineCollectorFlatMapFunction.scala @@ -0,0 +1,52 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.streams.config.StreamsConfiguration +import org.apache.streams.core.StreamsDatum +import org.apache.streams.flink.StreamsFlinkConfiguration +import org.apache.streams.twitter.config.TwitterTimelineProviderConfiguration +import org.apache.streams.twitter.provider.TwitterTimelineProvider + +import scala.collection.JavaConversions._ + +/** + * Created by sblackmon on 6/2/16. + */ +class TimelineCollectorFlatMapFunction( + streamsConfiguration : StreamsConfiguration, + twitterConfiguration : TwitterTimelineProviderConfiguration, + streamsFlinkConfiguration : StreamsFlinkConfiguration + ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { + var size : IntCounter = new IntCounter() + var counter : IntCounter = new IntCounter() + override def open(parameters: Configuration): Unit = { + getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.size", this.size) + getRuntimeContext().addAccumulator("TimelineCollectorFlatMapFunction.counter", this.counter) + } + override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { + size.add(input.size) + collectPosts(input, out) + } + def collectPosts(ids : List[String], out : Collector[StreamsDatum]) = { + try { + val conf = twitterConfiguration.withInfo(ids).asInstanceOf[TwitterTimelineProviderConfiguration] + val twitProvider: TwitterTimelineProvider = new TwitterTimelineProvider(conf) + twitProvider.prepare(conf) + twitProvider.startStream() + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) + val current = twitProvider.readCurrent().iterator().toList + counter.add(current.size) + current.map(out.collect(_)) + } while( twitProvider.isRunning ) + } finally { + + } + } +} diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala new file mode 100644 index 0000000..46e0d4a --- /dev/null +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/main/scala/org/apache/streams/examples/flink/twitter/collection/UserInformationCollectorFlatMapFunction.scala @@ -0,0 +1,48 @@ +package org.apache.streams.examples.flink.twitter.collection + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.Uninterruptibles +import org.apache.flink.api.common.accumulators.IntCounter +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.apache.streams.config.StreamsConfiguration +import org.apache.streams.core.StreamsDatum +import org.apache.streams.flink.StreamsFlinkConfiguration +import org.apache.streams.twitter.config.TwitterUserInformationConfiguration +import org.apache.streams.twitter.provider.TwitterUserInformationProvider + +import scala.collection.JavaConversions._ + +/** + * Created by sblackmon on 6/2/16. + */ +class UserInformationCollectorFlatMapFunction( + streamsConfiguration : StreamsConfiguration, + twitterConfiguration : TwitterUserInformationConfiguration, + streamsFlinkConfiguration : StreamsFlinkConfiguration + ) extends RichFlatMapFunction[List[String], StreamsDatum] with Serializable { + var size : IntCounter = new IntCounter() + var counter : IntCounter = new IntCounter() + override def open(parameters: Configuration): Unit = { + getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.size", this.size) + getRuntimeContext().addAccumulator("UserInformationCollectorFlatMapFunction.counter", this.counter) + } + override def flatMap(input: List[String], out: Collector[StreamsDatum]): Unit = { + size.add(input.size) + collectProfiles(input, out) + } + def collectProfiles(ids : List[String], out : Collector[StreamsDatum]) = { + val conf = twitterConfiguration.withInfo(ids) + val twitProvider: TwitterUserInformationProvider = new TwitterUserInformationProvider(conf) + twitProvider.prepare(conf) + twitProvider.startStream() + do { + Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getProviderWaitMs, TimeUnit.MILLISECONDS) + val current = twitProvider.readCurrent().iterator().toList + counter.add(current.size) + current.map(out.collect(_)) + } while( twitProvider.isRunning ) + } +} diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf index 67f7c9c..ec4dc4e 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFollowersIT.conf @@ -32,9 +32,10 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration { max_items = 5000 } org.apache.streams.config.StreamsConfiguration { + parallelism = 1 providerWaitMs = 1000 } org.apache.streams.flink.StreamsFlinkConfiguration { local = true test = true -} \ No newline at end of file +} diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf index 724cd43..fd4b1db 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterFollowingPipelineFriendsIT.conf @@ -31,6 +31,7 @@ org.apache.streams.twitter.config.TwitterFollowingConfiguration { ids_only = true } org.apache.streams.config.StreamsConfiguration { + parallelism = 1 providerWaitMs = 1000 } org.apache.streams.flink.StreamsFlinkConfiguration { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf index 626d8f6..8f5aadf 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterPostsPipelineIT.conf @@ -27,6 +27,7 @@ org.apache.streams.hdfs.HdfsWriterConfiguration { writerPath = "FlinkTwitterPostsPipelineIT" } org.apache.streams.config.StreamsConfiguration { + parallelism = 1 providerWaitMs = 1000 } org.apache.streams.flink.StreamsFlinkConfiguration { diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf index ea8125a..cbdb650 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/FlinkTwitterUserInformationPipelineIT.conf @@ -28,8 +28,15 @@ org.apache.streams.hdfs.HdfsWriterConfiguration { } org.apache.streams.config.StreamsConfiguration { providerWaitMs = 1000 + queueSize = 10000 + batchSize = 1000 + identifier = "FlinkTwitterUserInformationPipelineIT" + parallelism = 1 + providerTimeoutMs = 60000 + shutdownCheckDelay = 30000 + shutdownCheckInterval = 30000 } org.apache.streams.flink.StreamsFlinkConfiguration { local = true test = true -} \ No newline at end of file +} diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml new file mode 100644 index 0000000..bdb250d --- /dev/null +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/resources/testng.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" > + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<suite name="ExampleFlinkITs" preserve-order="true"> + + <test name="FlinkTwitterUserInformationPipelineIT"> + <classes> + <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterUserInformationPipelineIT" /> + </classes> + </test> + + <test name="FlinkTwitterPostsPipelineIT"> + <classes> + <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterPostsPipelineIT" /> + </classes> + </test> + + <test name="FlinkTwitterFollowingPipelineFriendsIT"> + <classes> + <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFriendsIT" /> + </classes> + </test> + + <test name="FlinkTwitterFollowingPipelineFollowersIT"> + <classes> + <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterFollowingPipelineFollowersIT" /> + </classes> + </test> + + <test name="FlinkTwitterSpritzerPipelineIT"> + <classes> + <class name="org.apache.streams.examples.flink.twitter.test.FlinkTwitterSpritzerPipelineIT" /> + </classes> + </test> + +</suite> \ No newline at end of file diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala index 617815e..bfc6940 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFollowersIT.scala @@ -36,7 +36,7 @@ import scala.io.Source /** * FlinkTwitterFollowingPipelineFollowersIT is an integration test for FlinkTwitterFollowingPipeline. */ -class FlinkTwitterFollowingPipelineFollowersIT extends FlatSpec { +class FlinkTwitterFollowingPipelineFollowersIT { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFollowersIT]) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala index 5ebea94..6d52d44 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterFollowingPipelineFriendsIT.scala @@ -16,7 +16,7 @@ * under the License. */ -package com.peoplepattern.streams.twitter.collection +package org.apache.streams.examples.flink.twitter.test import java.io.File import java.nio.file.{Files, Paths} @@ -36,7 +36,7 @@ import scala.io.Source /** * FlinkTwitterFollowingPipelineFriendsIT is an integration test for FlinkTwitterFollowingPipeline. */ -class FlinkTwitterFollowingPipelineFriendsIT extends FlatSpec { +class FlinkTwitterFollowingPipelineFriendsIT { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterFollowingPipelineFriendsIT]) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala index ed3edf3..a43c758 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterPostsPipelineIT.scala @@ -16,7 +16,7 @@ * under the License. */ -package com.peoplepattern.streams.twitter.collection +package org.apache.streams.examples.flink.twitter.test import java.io.File import java.nio.file.{Files, Paths} @@ -36,7 +36,7 @@ import scala.io.Source /** * FlinkTwitterPostsPipelineIT is an integration test for FlinkTwitterPostsPipeline. */ -class FlinkTwitterPostsPipelineIT extends FlatSpec { +class FlinkTwitterPostsPipelineIT { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterPostsPipelineIT]) diff --git a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala index 0652bc1..48b876a 100644 --- a/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala +++ b/streams-examples/streams-examples-flink/flink-twitter-collection/src/test/scala/org/apache/streams/examples/flink/twitter/test/FlinkTwitterUserInformationPipelineIT.scala @@ -16,7 +16,7 @@ * under the License. */ -package com.peoplepattern.streams.twitter.collection +package org.apache.streams.examples.flink.twitter.test import java.io.File import java.nio.file.{Files, Paths} @@ -36,7 +36,7 @@ import scala.io.Source /** * FlinkTwitterUserInformationPipelineIT is an integration test for FlinkTwitterUserInformationPipeline. */ -class FlinkTwitterUserInformationPipelineIT extends FlatSpec { +class FlinkTwitterUserInformationPipelineIT { private val LOGGER: Logger = LoggerFactory.getLogger(classOf[FlinkTwitterUserInformationPipelineIT])
