Repository: incubator-samoa
Updated Branches:
  refs/heads/master e4489b0ac -> 243e78217


SAMOA-36: update flink adapter to flink 0.9.0 release


Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/243e7821
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/243e7821
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/243e7821

Branch: refs/heads/master
Commit: 243e78217bb98870ccc4837d54eaabd8e0287802
Parents: e4489b0
Author: Paris Carbone <[email protected]>
Authored: Fri May 29 10:53:23 2015 +0200
Committer: Paris Carbone <[email protected]>
Committed: Tue Jun 30 13:43:22 2015 +0200

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 samoa-flink/pom.xml                             |  7 ++--
 .../impl/FlinkEntranceProcessingItem.java       | 35 +++++++++++---------
 .../topology/impl/FlinkProcessingItem.java      | 21 +++++-------
 4 files changed, 32 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1599c3f..0dedc02 100644
--- a/pom.xml
+++ b/pom.xml
@@ -140,7 +140,7 @@
         <miniball.version>1.0.3</miniball.version>
         <s4.version>0.6.0-incubating</s4.version>
         <samza.version>0.7.0</samza.version>
-        <flink.version>0.9.0-milestone-1</flink.version>
+        <flink.version>0.9.0</flink.version>
         <slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
         <slf4j-simple.version>1.7.5</slf4j-simple.version>
         <maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-flink/pom.xml b/samoa-flink/pom.xml
index f56ac70..1ca3af5 100644
--- a/samoa-flink/pom.xml
+++ b/samoa-flink/pom.xml
@@ -1,15 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
   #%L
   SAMOA
   %%
-  Copyright (C) 2013 Yahoo! Inc.
+  Copyright (C) 2014 - 2015 Apache Software Foundation
   %%
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at
-
+  
        http://www.apache.org/licenses/LICENSE-2.0
-
+  
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
index e00874b..45f791a 100644
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkEntranceProcessingItem.java
@@ -24,11 +24,10 @@ package org.apache.samoa.flink.topology.impl;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.util.Collector;
 import org.apache.samoa.core.EntranceProcessor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.AbstractEntranceProcessingItem;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 
 import java.io.Serializable;
 
@@ -51,29 +50,33 @@ public class FlinkEntranceProcessingItem extends 
AbstractEntranceProcessingItem
                final int compID = getComponentId();
 
                
-               outStream = env.addSource(new RichSourceFunction<SamoaType>() {
-                       volatile boolean canceled;
-                       EntranceProcessor entrProc = proc;
-                       String id = streamId;
+               outStream = env.addSource(new RichSourceFunction() {
 
+                       private volatile boolean isCancelled;
+                       
                        @Override
-                       public void open(Configuration parameters) throws 
Exception {
-                               super.open(parameters);
-                               entrProc.onCreate(compID);
+                       public void run(SourceContext sourceContext) throws 
Exception {
+                               while(!isCancelled && entrProc.hasNext())
+                               {
+                                       
sourceContext.collect(SamoaType.of(entrProc.nextEvent(), id));
+                               }
                        }
 
                        @Override
-                       public void run(Collector<SamoaType> collector) throws 
Exception {
-                               while (!canceled && entrProc.hasNext()) {
-                                       
collector.collect(SamoaType.of(entrProc.nextEvent(), id));
-                               }
+                       public void cancel() {
+                               isCancelled = true;
                        }
 
+                       EntranceProcessor entrProc = proc;
+                       String id = streamId;
+
                        @Override
-                       public void cancel() {
-                               canceled = true;
+                       public void open(Configuration parameters) throws 
Exception {
+                               super.open(parameters);
+                               entrProc.onCreate(compID);
                        }
-               },Utils.tempTypeInfo);
+                       
+               }).returns(Utils.tempTypeInfo);
 
                ((FlinkStream) getOutputStream()).initialise();
        }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/243e7821/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
index 3f5431c..28701df 100644
--- 
a/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
+++ 
b/samoa-flink/src/main/java/org/apache/samoa/flink/topology/impl/FlinkProcessingItem.java
@@ -28,13 +28,14 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
 import org.apache.samoa.flink.helpers.Utils;
 import org.apache.samoa.topology.ProcessingItem;
 import org.apache.samoa.topology.Stream;
 import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 
-public class FlinkProcessingItem extends StreamInvokable<SamoaType, SamoaType> 
implements ProcessingItem, FlinkComponent, Serializable {
+public class FlinkProcessingItem extends AbstractUdfStreamOperator<SamoaType, 
FlinkProcessingItem.SamoaDelegateFunction> 
+               implements OneInputStreamOperator<SamoaType, SamoaType>, 
ProcessingItem, FlinkComponent, Serializable {
 
        private static final Logger logger = 
LoggerFactory.getLogger(FlinkProcessingItem.class);
        public static final int MAX_WAIT_TIME_MILLIS = 10000;
@@ -88,7 +90,7 @@ public class FlinkProcessingItem extends 
StreamInvokable<SamoaType, SamoaType> i
        }
 
        public void putToStream(ContentEvent data, Stream targetStream) {
-               collector.collect(SamoaType.of(data, 
targetStream.getStreamId()));
+               output.collect(SamoaType.of(data, targetStream.getStreamId()));
        }
 
        @Override
@@ -106,7 +108,7 @@ public class FlinkProcessingItem extends 
StreamInvokable<SamoaType, SamoaType> i
                                        if (inStream == null) {
                                                inStream = toBeMerged;
                                        } else {
-                                               inStream = 
inStream.merge(toBeMerged);
+                                               inStream = 
inStream.union(toBeMerged);
                                        }
                                } catch (RuntimeException e) {
                                        e.printStackTrace();
@@ -146,11 +148,8 @@ public class FlinkProcessingItem extends 
StreamInvokable<SamoaType, SamoaType> i
        }
 
        @Override
-       public void invoke() throws Exception {
-               while (readNext() != null) {
-                       SamoaType t = nextObject;
-                       fun.processEvent(t.f1);
-               }
+       public void processElement(SamoaType samoaType) throws Exception {
+               fun.processEvent(samoaType.f1);
        }
 
        @Override
@@ -221,10 +220,6 @@ public class FlinkProcessingItem extends 
StreamInvokable<SamoaType, SamoaType> i
                this.onIteration = onIteration;
        }
 
-       public boolean isOnIteration() {
-               return onIteration;
-       }
-
        static class SamoaDelegateFunction implements Function, Serializable {
                private final Processor proc;
 

Reply via email to