Repository: incubator-samoa Updated Branches: refs/heads/master e4489b0ac -> 243e78217
SAMOA-36: update flink adapter to flink 0.9.0 release Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/243e7821 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/243e7821 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/243e7821 Branch: refs/heads/master Commit: 243e78217bb98870ccc4837d54eaabd8e0287802 Parents: e4489b0 Author: Paris Carbone <[email protected]> Authored: Fri May 29 10:53:23 2015 +0200 Committer: Paris Carbone <[email protected]> Committed: Tue Jun 30 13:43:22 2015 +0200 ---------------------------------------------------------------------- pom.xml | 2 +- samoa-flink/pom.xml | 7 ++-- .../impl/FlinkEntranceProcessingItem.java | 35 +++++++++++--------- .../topology/impl/FlinkProcessingItem.java | 21 +++++------- 4 files changed, 32 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1599c3f..0dedc02 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ <miniball.version>1.0.3</miniball.version> <s4.version>0.6.0-incubating</s4.version> <samza.version>0.7.0</samza.version> - <flink.version>0.9.0-milestone-1</flink.version> + <flink.version>0.9.0</flink.version> <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version> <slf4j-simple.version>1.7.5</slf4j-simple.version> <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml index f56ac70..1ca3af5 100644 --- a/samoa-flink/pom.xml +++ b/samoa-flink/pom.xml @@ -1,15 +1,16 @@ +<?xml version="1.0" encoding="UTF-8"?> <!-- #%L SAMOA %% - Copyright (C) 2013 Yahoo! Inc. + Copyright (C) 2014 - 2015 Apache Software Foundation %% Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java index e00874b..45f791a 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java @@ -24,11 +24,10 @@ package org.apache.samoa.flink.topology.impl; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.function.source.RichSourceFunction; -import org.apache.flink.util.Collector; import org.apache.samoa.core.EntranceProcessor; import org.apache.samoa.flink.helpers.Utils; import org.apache.samoa.topology.AbstractEntranceProcessingItem; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.io.Serializable; @@ -51,29 +50,33 @@ public class FlinkEntranceProcessingItem extends AbstractEntranceProcessingItem final int compID = getComponentId(); - outStream = env.addSource(new RichSourceFunction<SamoaType>() { - volatile boolean canceled; - EntranceProcessor entrProc = proc; - String id = streamId; + outStream = env.addSource(new RichSourceFunction() { + private volatile boolean isCancelled; + @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - entrProc.onCreate(compID); + public void run(SourceContext sourceContext) throws Exception { + while(!isCancelled && entrProc.hasNext()) + { + sourceContext.collect(SamoaType.of(entrProc.nextEvent(), id)); + } } @Override - public void run(Collector<SamoaType> collector) throws Exception { - while (!canceled && entrProc.hasNext()) { - collector.collect(SamoaType.of(entrProc.nextEvent(), id)); - } + public void cancel() { + isCancelled = true; } + EntranceProcessor entrProc = proc; + String id = streamId; + @Override - public void cancel() { - canceled = true; + public void open(Configuration parameters) throws Exception { + super.open(parameters); + entrProc.onCreate(compID); } - },Utils.tempTypeInfo); + + }).returns(Utils.tempTypeInfo); ((FlinkStream) getOutputStream()).initialise(); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java index 3f5431c..28701df 100644 --- a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java +++ b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java @@ -28,13 +28,14 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.samoa.core.ContentEvent; import org.apache.samoa.core.Processor; import org.apache.samoa.flink.helpers.Utils; import org.apache.samoa.topology.ProcessingItem; import org.apache.samoa.topology.Stream; import org.apache.samoa.utils.PartitioningScheme; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +44,8 @@ import java.util.ArrayList; import java.util.List; -public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> implements ProcessingItem, FlinkComponent, Serializable { +public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, FlinkProcessingItem.SamoaDelegateFunction> + implements OneInputStreamOperator<SamoaType, SamoaType>, ProcessingItem, FlinkComponent, Serializable { private static final Logger logger = LoggerFactory.getLogger(FlinkProcessingItem.class); public static final int MAX_WAIT_TIME_MILLIS = 10000; @@ -88,7 +90,7 @@ public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> i } public void putToStream(ContentEvent data, Stream targetStream) { - collector.collect(SamoaType.of(data, targetStream.getStreamId())); + output.collect(SamoaType.of(data, targetStream.getStreamId())); } @Override @@ -106,7 +108,7 @@ public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> i if (inStream == null) { inStream = toBeMerged; } else { - inStream = inStream.merge(toBeMerged); + inStream = inStream.union(toBeMerged); } } catch (RuntimeException e) { e.printStackTrace(); @@ -146,11 +148,8 @@ public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> i } @Override - public void invoke() throws Exception { - while (readNext() != null) { - SamoaType t = nextObject; - fun.processEvent(t.f1); - } + public void processElement(SamoaType samoaType) throws Exception { + fun.processEvent(samoaType.f1); } @Override @@ -221,10 +220,6 @@ public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> i this.onIteration = onIteration; } - public boolean isOnIteration() { - return onIteration; - } - static class SamoaDelegateFunction implements Function, Serializable { private final Processor proc;
