http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
new file mode 100755
index 0000000..8afac75
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+       private static final long serialVersionUID = 1L;
+
+       Iterable<T> iterable;
+
+       public FromElementsFunction(T... elements) {
+               this.iterable = Arrays.asList(elements);
+       }
+
+       public FromElementsFunction(Collection<T> elements) {
+               this.iterable = elements;
+       }
+
+       public FromElementsFunction(Iterable<T> elements) {
+               this.iterable = elements;
+       }
+
+       @Override
+       public void invoke(Collector<T> collector) throws Exception {
+               for (T element : iterable) {
+                       collector.collect(element);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
new file mode 100755
index 0000000..3afd06e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NumberSequenceIterator;
+
+/**
+ * Source Function used to generate the number sequence
+ * 
+ */
+public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
+
+       private static final long serialVersionUID = 1L;
+
+       private NumberSequenceIterator fullIterator;
+       private NumberSequenceIterator splitIterator;
+
+       public GenSequenceFunction(long from, long to) {
+               fullIterator = new NumberSequenceIterator(from, to);
+       }
+
+       @Override
+       public void invoke(Collector<Long> collector) throws Exception {
+               while (splitIterator.hasNext()) {
+                       collector.collect(splitIterator.next());
+               }
+       }
+
+       @Override
+       public void open(Configuration config) {
+               int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
+               int numOfSubTasks = 
getRuntimeContext().getNumberOfParallelSubtasks();
+               splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
new file mode 100644
index 0000000..664d39a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public interface GenericSourceFunction<T> {
+
+       public TypeInformation<T> getType();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
new file mode 100644
index 0000000..46d4fe9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.util.Collector;
+
+public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
+
+       public void invoke(Collector<OUT> collector) throws Exception;
+               
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
new file mode 100644
index 0000000..5bbfd4c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichParallelSourceFunction<OUT> extends 
AbstractRichFunction implements
+               ParallelSourceFunction<OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
new file mode 100755
index 0000000..4b947c7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction 
implements
+               SourceFunction<OUT> {
+
+       private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
new file mode 100644
index 0000000..ac82b10
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+public class SocketTextStreamFunction extends RichSourceFunction<String> {
+       private static final long serialVersionUID = 1L;
+       
+       private String hostname;
+       private int port;
+       private char delimiter;
+       private Socket socket;
+       private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+       public SocketTextStreamFunction(String hostname, int port, char 
delimiter) {
+               this.hostname = hostname;
+               this.port = port;
+               this.delimiter = delimiter;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               socket = new Socket();
+               
+               socket.connect(new InetSocketAddress(hostname, port), 
CONNECTION_TIMEOUT_TIME);
+       }
+       
+       @Override
+       public void invoke(Collector<String> collector) throws Exception {
+               while (!socket.isClosed() && socket.isConnected()) {
+                       streamFromSocket(collector, socket);
+               }
+       }
+
+       public void streamFromSocket(Collector<String> collector, Socket 
socket) throws Exception {
+               StringBuffer buffer = new StringBuffer();
+               BufferedReader reader = new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
+
+               while (true) {
+                       int data = reader.read();
+                       if (!socket.isConnected() || socket.isClosed() || data 
== -1) {
+                               break;
+                       }
+
+                       if (data == delimiter) {
+                               collector.collect(buffer.toString());
+                               buffer = new StringBuffer();
+                       } else if (data != '\r') { // ignore carriage return
+                               buffer.append((char) data);
+                       }
+               }
+
+               if (buffer.length() > 0) {
+                       collector.collect(buffer.toString());
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               socket.close();
+               super.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
new file mode 100755
index 0000000..917562a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.source;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+       public void invoke(Collector<OUT> collector) throws Exception;
+               
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
new file mode 100644
index 0000000..24c0319
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.Collector;
+
+public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, 
OUT> implements
+               Collector<IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       public ChainableInvokable(Function userFunction) {
+               super(userFunction);
+               setChainingStrategy(ChainingStrategy.ALWAYS);
+       }
+
+       public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> 
inSerializer) {
+               this.collector = collector;
+               this.inSerializer = inSerializer;
+               this.objectSerializer = inSerializer.getObjectSerializer();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
new file mode 100755
index 0000000..13a6ba1
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+
+public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
+       private static final long serialVersionUID = 1L;
+
+       private SinkFunction<IN> sinkFunction;
+
+       public SinkInvokable(SinkFunction<IN> sinkFunction) {
+               super(sinkFunction);
+               this.sinkFunction = sinkFunction;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       callUserFunctionAndLogException();
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               sinkFunction.invoke(nextObject);
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
new file mode 100644
index 0000000..f1cf2c5
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.Serializable;
+
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements 
Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private SourceFunction<OUT> sourceFunction;
+
+       public SourceInvokable(SourceFunction<OUT> sourceFunction) {
+               super(sourceFunction);
+               this.sourceFunction = sourceFunction;
+       }
+
+       @Override
+       public void invoke() {
+               callUserFunctionAndLogException();
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               sourceFunction.invoke(collector);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
new file mode 100644
index 0000000..6cee5f2
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The StreamInvokable represents the base class for all invokables in the
+ * streaming topology.
+ * 
+ * @param <OUT>
+ *            The output type of the invokable
+ */
+public abstract class StreamInvokable<IN, OUT> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamInvokable.class);
+
+       protected StreamTaskContext<OUT> taskContext;
+
+       protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
+       protected StreamRecordSerializer<IN> inSerializer;
+       protected TypeSerializer<IN> objectSerializer;
+       protected StreamRecord<IN> nextRecord;
+       protected IN nextObject;
+       protected boolean isMutable;
+
+       protected Collector<OUT> collector;
+       protected Function userFunction;
+       protected volatile boolean isRunning;
+
+       private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
+
+       public StreamInvokable(Function userFunction) {
+               this.userFunction = userFunction;
+       }
+
+       /**
+        * Initializes the {@link StreamInvokable} for input and output handling
+        * 
+        * @param taskContext
+        *            StreamTaskContext representing the vertex
+        */
+       public void setup(StreamTaskContext<OUT> taskContext) {
+               this.collector = taskContext.getOutputCollector();
+               this.recordIterator = taskContext.getInput(0);
+               this.inSerializer = taskContext.getInputSerializer(0);
+               if (this.inSerializer != null) {
+                       this.nextRecord = inSerializer.createInstance();
+                       this.objectSerializer = 
inSerializer.getObjectSerializer();
+               }
+               this.taskContext = taskContext;
+       }
+
+       /**
+        * Method that will be called when the operator starts, should encode 
the
+        * processing logic
+        */
+       public abstract void invoke() throws Exception;
+
+       /*
+        * Reads the next record from the reader iterator and stores it in the
+        * nextRecord variable
+        */
+       protected StreamRecord<IN> readNext() {
+               this.nextRecord = inSerializer.createInstance();
+               try {
+                       nextRecord = recordIterator.next(nextRecord);
+                       try {
+                               nextObject = nextRecord.getObject();
+                       } catch (NullPointerException e) {
+                               // end of stream
+                       }
+                       return nextRecord;
+               } catch (IOException e) {
+                       throw new RuntimeException("Could not read next 
record.");
+               }
+       }
+
+       /**
+        * The call of the user implemented function should be implemented here
+        */
+       protected void callUserFunction() throws Exception {
+       }
+
+       /**
+        * Method for logging exceptions thrown during the user function call
+        */
+       protected void callUserFunctionAndLogException() {
+               try {
+                       callUserFunction();
+               } catch (Exception e) {
+                       if (LOG.isErrorEnabled()) {
+                               LOG.error("Calling user function failed due to: 
{}",
+                                               
StringUtils.stringifyException(e));
+                       }
+               }
+       }
+
+       /**
+        * Open method to be used if the user defined function extends the
+        * RichFunction class
+        * 
+        * @param parameters
+        *            The configuration parameters for the operator
+        */
+       public void open(Configuration parameters) throws Exception {
+               isRunning = true;
+               FunctionUtils.openFunction(userFunction, parameters);
+       }
+
+       /**
+        * Close method to be used if the user defined function extends the
+        * RichFunction class
+        * 
+        */
+       public void close() {
+               isRunning = false;
+               collector.close();
+               try {
+                       FunctionUtils.closeFunction(userFunction);
+               } catch (Exception e) {
+                       throw new RuntimeException("Error when closing the 
function: " + e.getMessage());
+               }
+       }
+
+       public void setRuntimeContext(RuntimeContext t) {
+               FunctionUtils.setFunctionRuntimeContext(userFunction, t);
+       }
+
+       protected IN copy(IN record) {
+               return objectSerializer.copy(record);
+       }
+
+       public void setChainingStrategy(ChainingStrategy strategy) {
+               if (strategy == ChainingStrategy.ALWAYS) {
+                       if (!(this instanceof ChainableInvokable)) {
+                               throw new RuntimeException(
+                                               "Invokable needs to extend 
ChainableInvokable to be chained");
+                       }
+               }
+               this.chainingStrategy = strategy;
+       }
+
+       public ChainingStrategy getChainingStrategy() {
+               return chainingStrategy;
+       }
+
+       public static enum ChainingStrategy {
+               ALWAYS, NEVER, HEAD;
+       }
+
+       public Function getUserFunction() {
+               return userFunction;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
new file mode 100755
index 0000000..c3475e9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+public interface BatchIterator<IN> extends Iterator<IN>, Serializable {
+       public void reset();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
new file mode 100644
index 0000000..3fc314c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
+       private static final long serialVersionUID = 1L;
+
+       Long count = 0L;
+
+       public CounterInvokable() {
+               super(null);
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       collector.collect(++count);
+               }
+       }
+
+       @Override
+       public void collect(IN record) {
+               collector.collect(++count);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
new file mode 100644
index 0000000..0c8298e
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
+
+       private static final long serialVersionUID = 1L;
+
+       FilterFunction<IN> filterFunction;
+       private boolean collect;
+
+       public FilterInvokable(FilterFunction<IN> filterFunction) {
+               super(filterFunction);
+               this.filterFunction = filterFunction;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       callUserFunctionAndLogException();
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               collect = filterFunction.filter(copy(nextObject));
+               if (collect) {
+                       collector.collect(nextObject);
+               }
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
new file mode 100644
index 0000000..2a4081b
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private FlatMapFunction<IN, OUT> flatMapper;
+
+       public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) {
+               super(flatMapper);
+               this.flatMapper = flatMapper;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       callUserFunctionAndLogException();
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               flatMapper.flatMap(nextObject, collector);
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
new file mode 100755
index 0000000..c2177fa
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+
+public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
+       private static final long serialVersionUID = 1L;
+
+       private KeySelector<IN, ?> keySelector;
+       private Map<Object, IN> values;
+       private IN reduced;
+
+       public GroupedReduceInvokable(ReduceFunction<IN> reducer, 
KeySelector<IN, ?> keySelector) {
+               super(reducer);
+               this.keySelector = keySelector;
+               values = new HashMap<Object, IN>();
+       }
+
+       @Override
+       protected void reduce() throws Exception {
+               Object key = nextRecord.getKey(keySelector);
+               currentValue = values.get(key);
+               nextValue = nextObject;
+               if (currentValue != null) {
+                       callUserFunctionAndLogException();
+                       values.put(key, reduced);
+                       collector.collect(reduced);
+               } else {
+                       values.put(key, nextValue);
+                       collector.collect(nextValue);
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               reduced = reducer.reduce(currentValue, nextValue);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
new file mode 100644
index 0000000..997463c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This invokable allows windowing based on {@link TriggerPolicy} and
+ * {@link EvictionPolicy} instances including their active and cloneable
+ * versions. It is additionally aware of the creation of windows per group.
+ * 
+ * A {@link KeySelector} is used to specify the key position or key extraction.
+ * The {@link ReduceFunction} will be executed on each group separately.
+ * Policies might either be centralized or distributed. It is not possible to
+ * use central and distributed eviction policies at the same time. A 
distributed
+ * policy have to be a {@link CloneableTriggerPolicy} or
+ * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+ * instances for each group. At the startup time the distributed policies will
+ * be stored as sample, and only clones of them will be used to maintain the
+ * groups. Therefore, each group starts with the initial policy states.
+ * 
+ * While a distributed policy only gets notified with the elements belonging to
+ * the respective group, a centralized policy get notified with all arriving
+ * elements. When a centralized trigger occurred, all groups get triggered. 
This
+ * is done by submitting the element which caused the trigger as real element 
to
+ * the groups it belongs to and as fake element to all other groups. Within the
+ * groups the element might be further processed, causing more triggers,
+ * prenotifications of active distributed policies and evictions like usual.
+ * 
+ * Central policies can be instance of {@link ActiveTriggerPolicy} and also
+ * implement the
+ * {@link 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+ * method. Fake elements created on prenotification will be forwarded to all
+ * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
+ * it forwards/distributed calls all groups.
+ * 
+ * @param <IN>
+ *            The type of input elements handled by this operator invokable.
+ */
+public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
+
+       /**
+        * Auto-generated serial version UID
+        */
+       private static final long serialVersionUID = -3469545957144404137L;
+
+       private KeySelector<IN, ?> keySelector;
+       private Configuration parameters;
+       private LinkedList<ActiveTriggerPolicy<IN>> 
activeCentralTriggerPolicies;
+       private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
+       private LinkedList<ActiveEvictionPolicy<IN>> 
activeCentralEvictionPolicies;
+       private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
+       private LinkedList<CloneableTriggerPolicy<IN>> 
distributedTriggerPolicies;
+       private LinkedList<CloneableEvictionPolicy<IN>> 
distributedEvictionPolicies;
+       private Map<Object, WindowInvokable<IN, OUT>> windowingGroups;
+       private LinkedList<Thread> activePolicyThreads;
+       private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
+       private LinkedList<WindowInvokable<IN, OUT>> 
deleteOrderForCentralEviction;
+
+       /**
+        * This constructor creates an instance of the grouped windowing 
invokable.
+        * 
+        * A {@link KeySelector} is used to specify the key position or key
+        * extraction. The {@link ReduceFunction} will be executed on each group
+        * separately. Policies might either be centralized or distributed. It 
is
+        * not possible to use central and distributed eviction policies at the 
same
+        * time. A distributed policy have to be a {@link 
CloneableTriggerPolicy} or
+        * {@link CloneableEvictionPolicy} as it will be cloned to have 
separated
+        * instances for each group. At the startup time the distributed 
policies
+        * will be stored as sample, and only clones of them will be used to
+        * maintain the groups. Therefore, each group starts with the initial 
policy
+        * states.
+        * 
+        * While a distributed policy only gets notified with the elements 
belonging
+        * to the respective group, a centralized policy get notified with all
+        * arriving elements. When a centralized trigger occurred, all groups 
get
+        * triggered. This is done by submitting the element which caused the
+        * trigger as real element to the groups it belongs to and as fake 
element
+        * to all other groups. Within the groups the element might be further
+        * processed, causing more triggers, prenotifications of active 
distributed
+        * policies and evictions like usual.
+        * 
+        * Central policies can be instance of {@link ActiveTriggerPolicy} and 
also
+        * implement the
+        * {@link 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+        * method. Fake elements created on prenotification will be forwarded 
to all
+        * groups. The {@link ActiveTriggerCallback} is also implemented in a 
way,
+        * that it forwards/distributed calls all groups.
+        * 
+        * @param userFunction
+        *            The user defined function.
+        * @param keySelector
+        *            A key selector to extract the key for the groups from the
+        *            input data.
+        * @param distributedTriggerPolicies
+        *            Trigger policies to be distributed and maintained 
individually
+        *            within each group.
+        * @param distributedEvictionPolicies
+        *            Eviction policies to be distributed and maintained
+        *            individually within each group. Note that there cannot be
+        *            both, central and distributed eviction policies at the 
same
+        *            time.
+        * @param centralTriggerPolicies
+        *            Trigger policies which will only exist once at a central
+        *            place. In case a central policy triggers, it will cause 
all
+        *            groups to be emitted. (Remark: Empty groups cannot be 
emitted.
+        *            If only one element is contained a group, this element 
itself
+        *            is returned as aggregated result.)
+        * @param centralEvictionPolicies
+        *            Eviction which will only exist once at a central place. 
Note
+        *            that there cannot be both, central and distributed 
eviction
+        *            policies at the same time. The central eviction policy 
will
+        *            work on an simulated element buffer containing all 
elements no
+        *            matter which group they belong to.
+        */
+       public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> 
keySelector,
+                       LinkedList<CloneableTriggerPolicy<IN>> 
distributedTriggerPolicies,
+                       LinkedList<CloneableEvictionPolicy<IN>> 
distributedEvictionPolicies,
+                       LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
+                       LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) 
{
+
+               super(userFunction);
+
+               this.keySelector = keySelector;
+
+               // handle the triggers
+               if (centralTriggerPolicies != null) {
+                       this.centralTriggerPolicies = centralTriggerPolicies;
+                       this.activeCentralTriggerPolicies = new 
LinkedList<ActiveTriggerPolicy<IN>>();
+
+                       for (TriggerPolicy<IN> trigger : 
centralTriggerPolicies) {
+                               if (trigger instanceof ActiveTriggerPolicy) {
+                                       
this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+                               }
+                       }
+               } else {
+                       this.centralTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
+               }
+
+               if (distributedTriggerPolicies != null) {
+                       this.distributedTriggerPolicies = 
distributedTriggerPolicies;
+               } else {
+                       this.distributedTriggerPolicies = new 
LinkedList<CloneableTriggerPolicy<IN>>();
+               }
+
+               if (distributedEvictionPolicies != null) {
+                       this.distributedEvictionPolicies = 
distributedEvictionPolicies;
+               } else {
+                       this.distributedEvictionPolicies = new 
LinkedList<CloneableEvictionPolicy<IN>>();
+               }
+
+               this.activeCentralEvictionPolicies = new 
LinkedList<ActiveEvictionPolicy<IN>>();
+
+               if (centralEvictionPolicies != null) {
+                       this.centralEvictionPolicies = centralEvictionPolicies;
+
+                       for (EvictionPolicy<IN> eviction : 
centralEvictionPolicies) {
+                               if (eviction instanceof ActiveEvictionPolicy) {
+                                       
this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
+                               }
+                       }
+               } else {
+                       this.centralEvictionPolicies = new 
LinkedList<EvictionPolicy<IN>>();
+               }
+
+               this.windowingGroups = new HashMap<Object, WindowInvokable<IN, 
OUT>>();
+               this.activePolicyThreads = new LinkedList<Thread>();
+               this.currentTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
+               this.deleteOrderForCentralEviction = new 
LinkedList<WindowInvokable<IN, OUT>>();
+
+               // check that not both, central and distributed eviction, is 
used at the
+               // same time.
+               if (!this.centralEvictionPolicies.isEmpty() && 
!this.distributedEvictionPolicies.isEmpty()) {
+                       throw new UnsupportedOperationException(
+                                       "You can only use either central or 
distributed eviction policies but not both at the same time.");
+               }
+
+               // Check that there is at least one trigger and one eviction 
policy
+               if (this.centralEvictionPolicies.isEmpty() && 
this.distributedEvictionPolicies.isEmpty()) {
+                       throw new UnsupportedOperationException(
+                                       "You have to define at least one 
eviction policy");
+               }
+               if (this.centralTriggerPolicies.isEmpty() && 
this.distributedTriggerPolicies.isEmpty()) {
+                       throw new UnsupportedOperationException(
+                                       "You have to define at least one 
trigger policy");
+               }
+
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               // Prevent empty data streams
+               if (readNext() == null) {
+                       throw new RuntimeException("DataStream must not be 
empty");
+               }
+
+               // Continuously run
+               while (nextRecord != null) {
+                       WindowInvokable<IN, OUT> groupInvokable = 
windowingGroups.get(keySelector
+                                       .getKey(nextRecord.getObject()));
+                       if (groupInvokable == null) {
+                               groupInvokable = makeNewGroup(nextRecord);
+                       }
+
+                       // Run the precalls for central active triggers
+                       for (ActiveTriggerPolicy<IN> trigger : 
activeCentralTriggerPolicies) {
+                               Object[] result = 
trigger.preNotifyTrigger(nextRecord.getObject());
+                               for (Object in : result) {
+
+                                       // If central eviction is used, handle 
it here
+                                       if 
(!activeCentralEvictionPolicies.isEmpty()) {
+                                               
evictElements(centralActiveEviction(in));
+                                       }
+
+                                       // process in groups
+                                       for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
+                                               group.processFakeElement(in, 
trigger);
+                                               checkForEmptyGroupBuffer(group);
+                                       }
+                               }
+                       }
+
+                       // Process non-active central triggers
+                       for (TriggerPolicy<IN> triggerPolicy : 
centralTriggerPolicies) {
+                               if 
(triggerPolicy.notifyTrigger(nextRecord.getObject())) {
+                                       
currentTriggerPolicies.add(triggerPolicy);
+                               }
+                       }
+
+                       if (currentTriggerPolicies.isEmpty()) {
+
+                               // only add the element to its group
+                               
groupInvokable.processRealElement(nextRecord.getObject());
+                               checkForEmptyGroupBuffer(groupInvokable);
+
+                               // If central eviction is used, handle it here
+                               if (!centralEvictionPolicies.isEmpty()) {
+                                       
evictElements(centralEviction(nextRecord.getObject(), false));
+                                       
deleteOrderForCentralEviction.add(groupInvokable);
+                               }
+
+                       } else {
+
+                               // call user function for all groups
+                               for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
+                                       if (group == groupInvokable) {
+                                               // process real with 
initialized policies
+                                               
group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
+                                       } else {
+                                               // process like a fake but also 
initialized with
+                                               // policies
+                                               
group.externalTriggerFakeElement(nextRecord.getObject(),
+                                                               
currentTriggerPolicies);
+                                       }
+
+                                       // remove group in case it has an empty 
buffer
+                                       // checkForEmptyGroupBuffer(group);
+                               }
+
+                               // If central eviction is used, handle it here
+                               if (!centralEvictionPolicies.isEmpty()) {
+                                       
evictElements(centralEviction(nextRecord.getObject(), true));
+                                       
deleteOrderForCentralEviction.add(groupInvokable);
+                               }
+                       }
+
+                       // clear current trigger list
+                       currentTriggerPolicies.clear();
+
+                       // read next record
+                       readNext();
+               }
+
+               // Stop all remaining threads from policies
+               for (Thread t : activePolicyThreads) {
+                       t.interrupt();
+               }
+
+               // finally trigger the buffer.
+               for (WindowInvokable<IN, OUT> group : windowingGroups.values()) 
{
+                       group.emitFinalWindow(centralTriggerPolicies);
+               }
+
+       }
+
+       /**
+        * This method creates a new group. The method gets called in case an
+        * element arrives which has a key which was not seen before. The method
+        * created a nested {@link WindowInvokable} and therefore created 
clones of
+        * all distributed trigger and eviction policies.
+        * 
+        * @param element
+        *            The element which leads to the generation of a new group
+        *            (previously unseen key)
+        * @throws Exception
+        *             In case the {@link KeySelector} throws an exception in
+        *             {@link KeySelector#getKey(Object)}, the exception is not
+        *             catched by this method.
+        */
+       @SuppressWarnings("unchecked")
+       private WindowInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) 
throws Exception {
+               // clone the policies
+               LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies 
= new LinkedList<TriggerPolicy<IN>>();
+               LinkedList<EvictionPolicy<IN>> 
clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+               for (CloneableTriggerPolicy<IN> trigger : 
this.distributedTriggerPolicies) {
+                       clonedDistributedTriggerPolicies.add(trigger.clone());
+               }
+               for (CloneableEvictionPolicy<IN> eviction : 
this.distributedEvictionPolicies) {
+                       clonedDistributedEvictionPolicies.add(eviction.clone());
+               }
+
+               WindowInvokable<IN, OUT> groupInvokable;
+               if (userFunction instanceof ReduceFunction) {
+                       groupInvokable = (WindowInvokable<IN, OUT>) new 
WindowReduceInvokable<IN>(
+                                       (ReduceFunction<IN>) userFunction, 
clonedDistributedTriggerPolicies,
+                                       clonedDistributedEvictionPolicies);
+               } else {
+                       groupInvokable = new WindowGroupReduceInvokable<IN, 
OUT>(
+                                       (GroupReduceFunction<IN, OUT>) 
userFunction, clonedDistributedTriggerPolicies,
+                                       clonedDistributedEvictionPolicies);
+               }
+
+               groupInvokable.setup(taskContext);
+               groupInvokable.open(this.parameters);
+               windowingGroups.put(keySelector.getKey(element.getObject()), 
groupInvokable);
+
+               return groupInvokable;
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               this.parameters = parameters;
+               for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) 
{
+                       Runnable target = tp.createActiveTriggerRunnable(new 
WindowingCallback(tp));
+                       if (target != null) {
+                               Thread thread = new Thread(target);
+                               activePolicyThreads.add(thread);
+                               thread.start();
+                       }
+               }
+       };
+
+       /**
+        * This method is used to notify central eviction policies with a real
+        * element.
+        * 
+        * @param input
+        *            the real element to notify the eviction policy.
+        * @param triggered
+        *            whether a central trigger occurred or not.
+        * @return The number of elements to be deleted from the buffer.
+        */
+       private int centralEviction(IN input, boolean triggered) {
+               // Process the evictions and take care of double evictions
+               // In case there are multiple eviction policies present,
+               // only the one with the highest return value is recognized.
+               int currentMaxEviction = 0;
+               for (EvictionPolicy<IN> evictionPolicy : 
centralEvictionPolicies) {
+                       // use temporary variable to prevent multiple calls to
+                       // notifyEviction
+                       int tmp = evictionPolicy.notifyEviction(input, 
triggered,
+                                       deleteOrderForCentralEviction.size());
+                       if (tmp > currentMaxEviction) {
+                               currentMaxEviction = tmp;
+                       }
+               }
+               return currentMaxEviction;
+       }
+
+       /**
+        * This method is used to notify active central eviction policies with a
+        * fake element.
+        * 
+        * @param input
+        *            the fake element to notify the active central eviction
+        *            policies.
+        * @return The number of elements to be deleted from the buffer.
+        */
+       private int centralActiveEviction(Object input) {
+               // Process the evictions and take care of double evictions
+               // In case there are multiple eviction policies present,
+               // only the one with the highest return value is recognized.
+               int currentMaxEviction = 0;
+               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeCentralEvictionPolicies) {
+                       // use temporary variable to prevent multiple calls to
+                       // notifyEviction
+                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input,
+                                       deleteOrderForCentralEviction.size());
+                       if (tmp > currentMaxEviction) {
+                               currentMaxEviction = tmp;
+                       }
+               }
+               return currentMaxEviction;
+       }
+
+       /**
+        * This method is used in central eviction to delete a given number of
+        * elements from the buffer.
+        * 
+        * @param numToEvict
+        *            number of elements to delete from the virtual central 
element
+        *            buffer.
+        */
+       private void evictElements(int numToEvict) {
+               HashSet<WindowInvokable<IN, OUT>> usedGroups = new 
HashSet<WindowInvokable<IN, OUT>>();
+               for (; numToEvict > 0; numToEvict--) {
+                       WindowInvokable<IN, OUT> currentGroup = 
deleteOrderForCentralEviction.getFirst();
+                       // Do the eviction
+                       currentGroup.evictFirst();
+                       // Remember groups which possibly have an empty buffer 
after the
+                       // eviction
+                       usedGroups.add(currentGroup);
+                       try {
+                               deleteOrderForCentralEviction.removeFirst();
+                       } catch (NoSuchElementException e) {
+                               // when buffer is empty, ignore exception and 
stop deleting
+                               break;
+                       }
+
+               }
+
+               // Remove groups with empty buffer
+               for (WindowInvokable<IN, OUT> group : usedGroups) {
+                       checkForEmptyGroupBuffer(group);
+               }
+       }
+
+       /**
+        * Checks if the element buffer of a given windowing group is empty. If 
so,
+        * the group will be deleted.
+        * 
+        * @param group
+        *            The windowing group to be checked and and removed in case 
its
+        *            buffer is empty.
+        */
+       private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) {
+               if (group.isBufferEmpty()) {
+                       windowingGroups.remove(group);
+               }
+       }
+
+       /**
+        * This callback class allows to handle the callbacks done by threads
+        * defined in active trigger policies
+        * 
+        * @see 
ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
+        */
+       private class WindowingCallback implements ActiveTriggerCallback {
+               private ActiveTriggerPolicy<IN> policy;
+
+               public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+                       this.policy = policy;
+               }
+
+               @Override
+               public void sendFakeElement(Object datapoint) {
+
+                       // If central eviction is used, handle it here
+                       if (!centralEvictionPolicies.isEmpty()) {
+                               evictElements(centralActiveEviction(datapoint));
+                       }
+
+                       // handle element in groups
+                       for (WindowInvokable<IN, OUT> group : 
windowingGroups.values()) {
+                               group.processFakeElement(datapoint, policy);
+                               checkForEmptyGroupBuffer(group);
+                       }
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
new file mode 100644
index 0000000..7c8e577
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
+       private static final long serialVersionUID = 1L;
+
+       private MapFunction<IN, OUT> mapper;
+
+       public MapInvokable(MapFunction<IN, OUT> mapper) {
+               super(mapper);
+               this.mapper = mapper;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       callUserFunctionAndLogException();
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               collector.collect(mapper.map(nextObject));
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
new file mode 100644
index 0000000..c9d9e5a
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+
+public class ProjectInvokable<IN, OUT extends Tuple> extends 
StreamInvokable<IN, OUT> {
+       private static final long serialVersionUID = 1L;
+
+       transient OUT outTuple;
+       TypeSerializer<OUT> outTypeSerializer;
+       int[] fields;
+       int numFields;
+
+       public ProjectInvokable(int[] fields, TypeInformation<OUT> 
outTypeInformation) {
+               super(null);
+               this.fields = fields;
+               this.numFields = this.fields.length;
+               this.outTypeSerializer = outTypeInformation.createSerializer();
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       callUserFunctionAndLogException();
+               }
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               for (int i = 0; i < this.numFields; i++) {
+                       outTuple.setField(nextRecord.getField(fields[i]), i);
+               }
+               collector.collect(outTuple);
+       }
+
+       @Override
+       public void open(Configuration config) throws Exception {
+               super.open(config);
+               outTuple = outTypeSerializer.createInstance();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
new file mode 100644
index 0000000..e1a56cc
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
+       private static final long serialVersionUID = 1L;
+
+       protected ReduceFunction<IN> reducer;
+       protected IN currentValue;
+       protected IN nextValue;
+
+       public StreamReduceInvokable(ReduceFunction<IN> reducer) {
+               super(reducer);
+               this.reducer = reducer;
+               currentValue = null;
+       }
+
+       @Override
+       public void invoke() throws Exception {
+               while (readNext() != null) {
+                       reduce();
+               }
+       }
+
+       protected void reduce() throws Exception {
+               callUserFunctionAndLogException();
+
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+
+               nextValue = nextObject;
+
+               if (currentValue != null) {
+                       currentValue = reducer.reduce(currentValue, nextValue);
+               } else {
+                       currentValue = nextValue;
+
+               }
+               collector.collect(currentValue);
+
+       }
+
+       @Override
+       public void collect(IN record) {
+               nextObject = copy(record);
+               callUserFunctionAndLogException();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
new file mode 100644
index 0000000..b3fdfe8
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, 
OUT> {
+
+       private static final long serialVersionUID = 1L;
+       GroupReduceFunction<IN, OUT> reducer;
+
+       public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> 
userFunction,
+                       LinkedList<TriggerPolicy<IN>> triggerPolicies,
+                       LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+               super(userFunction, triggerPolicies, evictionPolicies);
+               this.reducer = userFunction;
+       }
+
+       @Override
+       protected void callUserFunction() throws Exception {
+               reducer.reduce(copyBuffer(), collector);
+       }
+
+       public LinkedList<IN> copyBuffer() {
+               LinkedList<IN> copy = new LinkedList<IN>();
+               for (IN element : buffer) {
+                       copy.add(copy(element));
+               }
+               return copy;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
new file mode 100644
index 0000000..ea891c9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, 
OUT> {
+
+       /**
+        * Auto-generated serial version UID
+        */
+       private static final long serialVersionUID = -8038984294071650730L;
+
+       private LinkedList<TriggerPolicy<IN>> triggerPolicies;
+       private LinkedList<EvictionPolicy<IN>> evictionPolicies;
+       private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
+       private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
+       private LinkedList<Thread> activePolicyTreads;
+       protected LinkedList<IN> buffer;
+       private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
+
+       /**
+        * This constructor created a windowing invokable using trigger and 
eviction
+        * policies.
+        * 
+        * @param userFunction
+        *            The user defined {@link ReduceFunction}
+        * @param triggerPolicies
+        *            A list of {@link TriggerPolicy}s and/or
+        *            {@link ActiveTriggerPolicy}s
+        * @param evictionPolicies
+        *            A list of {@link EvictionPolicy}s and/or
+        *            {@link ActiveEvictionPolicy}s
+        */
+       public WindowInvokable(Function userFunction, 
LinkedList<TriggerPolicy<IN>> triggerPolicies,
+                       LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+               super(userFunction);
+
+               this.triggerPolicies = triggerPolicies;
+               this.evictionPolicies = evictionPolicies;
+
+               activeTriggerPolicies = new 
LinkedList<ActiveTriggerPolicy<IN>>();
+               for (TriggerPolicy<IN> tp : triggerPolicies) {
+                       if (tp instanceof ActiveTriggerPolicy) {
+                               
activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
+                       }
+               }
+
+               activeEvictionPolicies = new 
LinkedList<ActiveEvictionPolicy<IN>>();
+               for (EvictionPolicy<IN> ep : evictionPolicies) {
+                       if (ep instanceof ActiveEvictionPolicy) {
+                               
activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
+                       }
+               }
+
+               this.activePolicyTreads = new LinkedList<Thread>();
+               this.buffer = new LinkedList<IN>();
+               this.currentTriggerPolicies = new 
LinkedList<TriggerPolicy<IN>>();
+       }
+
+       @Override
+       public void open(org.apache.flink.configuration.Configuration 
parameters) throws Exception {
+               super.open(parameters);
+               for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
+                       Runnable target = tp.createActiveTriggerRunnable(new 
WindowingCallback(tp));
+                       if (target != null) {
+                               Thread thread = new Thread(target);
+                               activePolicyTreads.add(thread);
+                               thread.start();
+                       }
+               }
+       };
+
+       /**
+        * This class allows the active trigger threads to call back and push 
fake
+        * elements at any time.
+        */
+       private class WindowingCallback implements ActiveTriggerCallback {
+               private ActiveTriggerPolicy<IN> policy;
+
+               public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+                       this.policy = policy;
+               }
+
+               @Override
+               public void sendFakeElement(Object datapoint) {
+                       processFakeElement(datapoint, this.policy);
+               }
+
+       }
+
+       @Override
+       public void invoke() throws Exception {
+
+               // Prevent empty data streams
+               if (readNext() == null) {
+                       throw new RuntimeException("DataStream must not be 
empty");
+               }
+
+               // Continuously run
+               while (nextRecord != null) {
+                       processRealElement(nextRecord.getObject());
+
+                       // Load next StreamRecord
+                       readNext();
+               }
+
+               // Stop all remaining threads from policies
+               for (Thread t : activePolicyTreads) {
+                       t.interrupt();
+               }
+
+               // finally trigger the buffer.
+               emitFinalWindow(null);
+
+       }
+
+       /**
+        * This method gets called in case of an grouped windowing in case 
central
+        * trigger occurred and the arriving element causing the trigger is not 
part
+        * of this group.
+        * 
+        * Remark: This is NOT the same as
+        * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! 
Here
+        * the eviction using active policies takes place after the call to the 
UDF.
+        * Usually it is done before when fake elements get submitted. This 
special
+        * behaviour is needed to allow the {@link GroupedWindowInvokable} to 
send
+        * central triggers to all groups, even if the current element does not
+        * belong to the group.
+        * 
+        * @param input
+        *            a fake input element
+        * @param policies
+        *            the list of policies which caused the call with this fake
+        *            element
+        */
+       protected synchronized void externalTriggerFakeElement(IN input,
+                       List<TriggerPolicy<IN>> policies) {
+
+               // Set the current triggers
+               currentTriggerPolicies.addAll(policies);
+
+               // emit
+               callUserFunctionAndLogException();
+
+               // clear the flag collection
+               currentTriggerPolicies.clear();
+
+               // Process the evictions and take care of double evictions
+               // In case there are multiple eviction policies present,
+               // only the one with the highest return value is recognized.
+               int currentMaxEviction = 0;
+               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeEvictionPolicies) {
+                       // use temporary variable to prevent multiple calls to
+                       // notifyEviction
+                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+                       if (tmp > currentMaxEviction) {
+                               currentMaxEviction = tmp;
+                       }
+               }
+
+               for (int i = 0; i < currentMaxEviction; i++) {
+                       try {
+                               buffer.removeFirst();
+                       } catch (NoSuchElementException e) {
+                               // In case no more elements are in the buffer:
+                               // Prevent failure and stop deleting.
+                               break;
+                       }
+               }
+       }
+
+       /**
+        * This method processed an arrived fake element The method is 
synchronized
+        * to ensure that it cannot interleave with
+        * {@link WindowInvokable#processRealElement(Object)}
+        * 
+        * @param input
+        *            a fake input element
+        * @param currentPolicy
+        *            the policy which produced this fake element
+        */
+       protected synchronized void processFakeElement(Object input, 
TriggerPolicy<IN> currentPolicy) {
+
+               // Process the evictions and take care of double evictions
+               // In case there are multiple eviction policies present,
+               // only the one with the highest return value is recognized.
+               int currentMaxEviction = 0;
+               for (ActiveEvictionPolicy<IN> evictionPolicy : 
activeEvictionPolicies) {
+                       // use temporary variable to prevent multiple calls to
+                       // notifyEviction
+                       int tmp = 
evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+                       if (tmp > currentMaxEviction) {
+                               currentMaxEviction = tmp;
+                       }
+               }
+
+               for (int i = 0; i < currentMaxEviction; i++) {
+                       try {
+                               buffer.removeFirst();
+                       } catch (NoSuchElementException e) {
+                               // In case no more elements are in the buffer:
+                               // Prevent failure and stop deleting.
+                               break;
+                       }
+               }
+
+               // Set the current trigger
+               currentTriggerPolicies.add(currentPolicy);
+
+               // emit
+               callUserFunctionAndLogException();
+
+               // clear the flag collection
+               currentTriggerPolicies.clear();
+       }
+
+       /**
+        * This method processed an arrived real element The method is 
synchronized
+        * to ensure that it cannot interleave with
+        * {@link WindowInvokable#processFakeElement(Object)}.
+        * 
+        * @param input
+        *            a real input element
+        * @param triggerPolicies
+        *            Allows to set trigger policies which are maintained
+        *            externally. This is the case for central policies in
+        *            {@link GroupedWindowInvokable}.
+        */
+       protected synchronized void processRealElement(IN input, 
List<TriggerPolicy<IN>> triggerPolicies) {
+               this.currentTriggerPolicies.addAll(triggerPolicies);
+               processRealElement(input);
+       }
+
+       /**
+        * This method processed an arrived real element The method is 
synchronized
+        * to ensure that it cannot interleave with
+        * {@link WindowInvokable#processFakeElement(Object)}
+        * 
+        * @param input
+        *            a real input element
+        */
+       protected synchronized void processRealElement(IN input) {
+
+               // Run the precalls to detect missed windows
+               for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
+                       // Remark: In case multiple active triggers are present 
the ordering
+                       // of the different fake elements returned by this 
triggers becomes
+                       // a problem. This might lead to unexpected results...
+                       // Should we limit the number of active triggers to 0 
or 1?
+                       Object[] result = trigger.preNotifyTrigger(input);
+                       for (Object in : result) {
+                               processFakeElement(in, trigger);
+                       }
+               }
+
+               // Remember if a trigger occurred
+               boolean isTriggered = false;
+
+               // Process the triggers
+               for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
+                       if (triggerPolicy.notifyTrigger(input)) {
+                               currentTriggerPolicies.add(triggerPolicy);
+                       }
+               }
+
+               // call user function
+               if (!currentTriggerPolicies.isEmpty()) {
+                       // emit
+                       callUserFunctionAndLogException();
+
+                       // clear the flag collection
+                       currentTriggerPolicies.clear();
+
+                       // remember trigger
+                       isTriggered = true;
+               }
+
+               // Process the evictions and take care of double evictions
+               // In case there are multiple eviction policies present,
+               // only the one with the highest return value is recognized.
+               int currentMaxEviction = 0;
+
+               for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
+                       // use temporary variable to prevent multiple calls to
+                       // notifyEviction
+                       int tmp = evictionPolicy.notifyEviction(input, 
isTriggered, buffer.size());
+                       if (tmp > currentMaxEviction) {
+                               currentMaxEviction = tmp;
+                       }
+               }
+
+               for (int i = 0; i < currentMaxEviction; i++) {
+                       try {
+                               buffer.removeFirst();
+                       } catch (NoSuchElementException e) {
+                               // In case no more elements are in the buffer:
+                               // Prevent failure and stop deleting.
+                               break;
+                       }
+               }
+
+               // Add the current element to the buffer
+               buffer.add(input);
+
+       }
+
+       /**
+        * This method removes the first element from the element buffer. It is 
used
+        * to provide central evictions in {@link GroupedWindowInvokable}
+        */
+       protected synchronized void evictFirst() {
+               try {
+                       buffer.removeFirst();
+               } catch (NoSuchElementException e) {
+                       // ignore exception
+               }
+       }
+
+       /**
+        * This method returns whether the element buffer is empty or not. It is
+        * used to figure out if a group can be deleted or not when
+        * {@link GroupedWindowInvokable} is used.
+        * 
+        * @return true in case the buffer is empty otherwise false.
+        */
+       protected boolean isBufferEmpty() {
+               return buffer.isEmpty();
+       }
+
+       /**
+        * This method does the final reduce at the end of the stream and emits 
the
+        * result.
+        * 
+        * @param centralTriggerPolicies
+        *            Allows to set trigger policies which are maintained
+        *            externally. This is the case for central policies in
+        *            {@link GroupedWindowInvokable}.
+        */
+       protected void emitFinalWindow(List<TriggerPolicy<IN>> 
centralTriggerPolicies) {
+               if (!buffer.isEmpty()) {
+                       currentTriggerPolicies.clear();
+
+                       if (centralTriggerPolicies != null) {
+                               
currentTriggerPolicies.addAll(centralTriggerPolicies);
+                       }
+
+                       for (TriggerPolicy<IN> policy : triggerPolicies) {
+                               currentTriggerPolicies.add(policy);
+                       }
+
+                       callUserFunctionAndLogException();
+               }
+       }
+
+}

Reply via email to