http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java new file mode 100755 index 0000000..8afac75 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import java.util.Arrays; +import java.util.Collection; + +import org.apache.flink.util.Collector; + +public class FromElementsFunction<T> implements SourceFunction<T> { + private static final long serialVersionUID = 1L; + + Iterable<T> iterable; + + public FromElementsFunction(T... elements) { + this.iterable = Arrays.asList(elements); + } + + public FromElementsFunction(Collection<T> elements) { + this.iterable = elements; + } + + public FromElementsFunction(Iterable<T> elements) { + this.iterable = elements; + } + + @Override + public void invoke(Collector<T> collector) throws Exception { + for (T element : iterable) { + collector.collect(element); + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java new file mode 100755 index 0000000..3afd06e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.flink.util.NumberSequenceIterator; + +/** + * Source Function used to generate the number sequence + * + */ +public class GenSequenceFunction extends RichParallelSourceFunction<Long> { + + private static final long serialVersionUID = 1L; + + private NumberSequenceIterator fullIterator; + private NumberSequenceIterator splitIterator; + + public GenSequenceFunction(long from, long to) { + fullIterator = new NumberSequenceIterator(from, to); + } + + @Override + public void invoke(Collector<Long> collector) throws Exception { + while (splitIterator.hasNext()) { + collector.collect(splitIterator.next()); + } + } + + @Override + public void open(Configuration config) { + int splitNumber = getRuntimeContext().getIndexOfThisSubtask(); + int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks(); + splitIterator = fullIterator.split(numOfSubTasks)[splitNumber]; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java new file mode 100644 index 0000000..664d39a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenericSourceFunction.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +public interface GenericSourceFunction<T> { + + public TypeInformation<T> getType(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java new file mode 100644 index 0000000..46d4fe9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.util.Collector; + +public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> { + + public void invoke(Collector<OUT> collector) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java new file mode 100644 index 0000000..5bbfd4c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichParallelSourceFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.api.common.functions.AbstractRichFunction; + +public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction implements + ParallelSourceFunction<OUT> { + + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java new file mode 100755 index 0000000..4b947c7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/RichSourceFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import org.apache.flink.api.common.functions.AbstractRichFunction; + +public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements + SourceFunction<OUT> { + + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java new file mode 100644 index 0000000..ac82b10 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.net.Socket; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +public class SocketTextStreamFunction extends RichSourceFunction<String> { + private static final long serialVersionUID = 1L; + + private String hostname; + private int port; + private char delimiter; + private Socket socket; + private static final int CONNECTION_TIMEOUT_TIME = 0; + + public SocketTextStreamFunction(String hostname, int port, char delimiter) { + this.hostname = hostname; + this.port = port; + this.delimiter = delimiter; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + socket = new Socket(); + + socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); + } + + @Override + public void invoke(Collector<String> collector) throws Exception { + while (!socket.isClosed() && socket.isConnected()) { + streamFromSocket(collector, socket); + } + } + + public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception { + StringBuffer buffer = new StringBuffer(); + BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + while (true) { + int data = reader.read(); + if (!socket.isConnected() || socket.isClosed() || data == -1) { + break; + } + + if (data == delimiter) { + collector.collect(buffer.toString()); + buffer = new StringBuffer(); + } else if (data != '\r') { // ignore carriage return + buffer.append((char) data); + } + } + + if (buffer.length() > 0) { + collector.collect(buffer.toString()); + } + } + + @Override + public void close() throws Exception { + socket.close(); + super.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java new file mode 100755 index 0000000..917562a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.function.source; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.util.Collector; + +public interface SourceFunction<OUT> extends Function, Serializable { + + public void invoke(Collector<OUT> collector) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java new file mode 100644 index 0000000..24c0319 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.util.Collector; + +public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OUT> implements + Collector<IN> { + + private static final long serialVersionUID = 1L; + + public ChainableInvokable(Function userFunction) { + super(userFunction); + setChainingStrategy(ChainingStrategy.ALWAYS); + } + + public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) { + this.collector = collector; + this.inSerializer = inSerializer; + this.objectSerializer = inSerializer.getObjectSerializer(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java new file mode 100755 index 0000000..13a6ba1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable; + +import org.apache.flink.streaming.api.function.sink.SinkFunction; + +public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> { + private static final long serialVersionUID = 1L; + + private SinkFunction<IN> sinkFunction; + + public SinkInvokable(SinkFunction<IN> sinkFunction) { + super(sinkFunction); + this.sinkFunction = sinkFunction; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + sinkFunction.invoke(nextObject); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java new file mode 100644 index 0000000..f1cf2c5 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable; + +import java.io.Serializable; + +import org.apache.flink.streaming.api.function.source.SourceFunction; + +public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable { + + private static final long serialVersionUID = 1L; + + private SourceFunction<OUT> sourceFunction; + + public SourceInvokable(SourceFunction<OUT> sourceFunction) { + super(sourceFunction); + this.sourceFunction = sourceFunction; + } + + @Override + public void invoke() { + callUserFunctionAndLogException(); + } + + @Override + protected void callUserFunction() throws Exception { + sourceFunction.invoke(collector); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java new file mode 100644 index 0000000..6cee5f2 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.StreamTaskContext; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The StreamInvokable represents the base class for all invokables in the + * streaming topology. + * + * @param <OUT> + * The output type of the invokable + */ +public abstract class StreamInvokable<IN, OUT> implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class); + + protected StreamTaskContext<OUT> taskContext; + + protected MutableObjectIterator<StreamRecord<IN>> recordIterator; + protected StreamRecordSerializer<IN> inSerializer; + protected TypeSerializer<IN> objectSerializer; + protected StreamRecord<IN> nextRecord; + protected IN nextObject; + protected boolean isMutable; + + protected Collector<OUT> collector; + protected Function userFunction; + protected volatile boolean isRunning; + + private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; + + public StreamInvokable(Function userFunction) { + this.userFunction = userFunction; + } + + /** + * Initializes the {@link StreamInvokable} for input and output handling + * + * @param taskContext + * StreamTaskContext representing the vertex + */ + public void setup(StreamTaskContext<OUT> taskContext) { + this.collector = taskContext.getOutputCollector(); + this.recordIterator = taskContext.getInput(0); + this.inSerializer = taskContext.getInputSerializer(0); + if (this.inSerializer != null) { + this.nextRecord = inSerializer.createInstance(); + this.objectSerializer = inSerializer.getObjectSerializer(); + } + this.taskContext = taskContext; + } + + /** + * Method that will be called when the operator starts, should encode the + * processing logic + */ + public abstract void invoke() throws Exception; + + /* + * Reads the next record from the reader iterator and stores it in the + * nextRecord variable + */ + protected StreamRecord<IN> readNext() { + this.nextRecord = inSerializer.createInstance(); + try { + nextRecord = recordIterator.next(nextRecord); + try { + nextObject = nextRecord.getObject(); + } catch (NullPointerException e) { + // end of stream + } + return nextRecord; + } catch (IOException e) { + throw new RuntimeException("Could not read next record."); + } + } + + /** + * The call of the user implemented function should be implemented here + */ + protected void callUserFunction() throws Exception { + } + + /** + * Method for logging exceptions thrown during the user function call + */ + protected void callUserFunctionAndLogException() { + try { + callUserFunction(); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Calling user function failed due to: {}", + StringUtils.stringifyException(e)); + } + } + } + + /** + * Open method to be used if the user defined function extends the + * RichFunction class + * + * @param parameters + * The configuration parameters for the operator + */ + public void open(Configuration parameters) throws Exception { + isRunning = true; + FunctionUtils.openFunction(userFunction, parameters); + } + + /** + * Close method to be used if the user defined function extends the + * RichFunction class + * + */ + public void close() { + isRunning = false; + collector.close(); + try { + FunctionUtils.closeFunction(userFunction); + } catch (Exception e) { + throw new RuntimeException("Error when closing the function: " + e.getMessage()); + } + } + + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(userFunction, t); + } + + protected IN copy(IN record) { + return objectSerializer.copy(record); + } + + public void setChainingStrategy(ChainingStrategy strategy) { + if (strategy == ChainingStrategy.ALWAYS) { + if (!(this instanceof ChainableInvokable)) { + throw new RuntimeException( + "Invokable needs to extend ChainableInvokable to be chained"); + } + } + this.chainingStrategy = strategy; + } + + public ChainingStrategy getChainingStrategy() { + return chainingStrategy; + } + + public static enum ChainingStrategy { + ALWAYS, NEVER, HEAD; + } + + public Function getUserFunction() { + return userFunction; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java new file mode 100755 index 0000000..c3475e9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchIterator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import java.io.Serializable; +import java.util.Iterator; + +public interface BatchIterator<IN> extends Iterator<IN>, Serializable { + public void reset(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java new file mode 100644 index 0000000..3fc314c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.streaming.api.invokable.ChainableInvokable; + +public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> { + private static final long serialVersionUID = 1L; + + Long count = 0L; + + public CounterInvokable() { + super(null); + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + collector.collect(++count); + } + } + + @Override + public void collect(IN record) { + collector.collect(++count); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java new file mode 100644 index 0000000..0c8298e --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; + +public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> { + + private static final long serialVersionUID = 1L; + + FilterFunction<IN> filterFunction; + private boolean collect; + + public FilterInvokable(FilterFunction<IN> filterFunction) { + super(filterFunction); + this.filterFunction = filterFunction; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + collect = filterFunction.filter(copy(nextObject)); + if (collect) { + collector.collect(nextObject); + } + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java new file mode 100644 index 0000000..2a4081b --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; + +public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> { + private static final long serialVersionUID = 1L; + + private FlatMapFunction<IN, OUT> flatMapper; + + public FlatMapInvokable(FlatMapFunction<IN, OUT> flatMapper) { + super(flatMapper); + this.flatMapper = flatMapper; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + flatMapper.flatMap(nextObject, collector); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java new file mode 100755 index 0000000..c2177fa --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; + +public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { + private static final long serialVersionUID = 1L; + + private KeySelector<IN, ?> keySelector; + private Map<Object, IN> values; + private IN reduced; + + public GroupedReduceInvokable(ReduceFunction<IN> reducer, KeySelector<IN, ?> keySelector) { + super(reducer); + this.keySelector = keySelector; + values = new HashMap<Object, IN>(); + } + + @Override + protected void reduce() throws Exception { + Object key = nextRecord.getKey(keySelector); + currentValue = values.get(key); + nextValue = nextObject; + if (currentValue != null) { + callUserFunctionAndLogException(); + values.put(key, reduced); + collector.collect(reduced); + } else { + values.put(key, nextValue); + collector.collect(nextValue); + } + } + + @Override + protected void callUserFunction() throws Exception { + reduced = reducer.reduce(currentValue, nextValue); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java new file mode 100644 index 0000000..997463c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; +import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; + +/** + * This invokable allows windowing based on {@link TriggerPolicy} and + * {@link EvictionPolicy} instances including their active and cloneable + * versions. It is additionally aware of the creation of windows per group. + * + * A {@link KeySelector} is used to specify the key position or key extraction. + * The {@link ReduceFunction} will be executed on each group separately. + * Policies might either be centralized or distributed. It is not possible to + * use central and distributed eviction policies at the same time. A distributed + * policy have to be a {@link CloneableTriggerPolicy} or + * {@link CloneableEvictionPolicy} as it will be cloned to have separated + * instances for each group. At the startup time the distributed policies will + * be stored as sample, and only clones of them will be used to maintain the + * groups. Therefore, each group starts with the initial policy states. + * + * While a distributed policy only gets notified with the elements belonging to + * the respective group, a centralized policy get notified with all arriving + * elements. When a centralized trigger occurred, all groups get triggered. This + * is done by submitting the element which caused the trigger as real element to + * the groups it belongs to and as fake element to all other groups. Within the + * groups the element might be further processed, causing more triggers, + * prenotifications of active distributed policies and evictions like usual. + * + * Central policies can be instance of {@link ActiveTriggerPolicy} and also + * implement the + * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)} + * method. Fake elements created on prenotification will be forwarded to all + * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that + * it forwards/distributed calls all groups. + * + * @param <IN> + * The type of input elements handled by this operator invokable. + */ +public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { + + /** + * Auto-generated serial version UID + */ + private static final long serialVersionUID = -3469545957144404137L; + + private KeySelector<IN, ?> keySelector; + private Configuration parameters; + private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies; + private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies; + private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies; + private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies; + private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies; + private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies; + private Map<Object, WindowInvokable<IN, OUT>> windowingGroups; + private LinkedList<Thread> activePolicyThreads; + private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies; + private LinkedList<WindowInvokable<IN, OUT>> deleteOrderForCentralEviction; + + /** + * This constructor creates an instance of the grouped windowing invokable. + * + * A {@link KeySelector} is used to specify the key position or key + * extraction. The {@link ReduceFunction} will be executed on each group + * separately. Policies might either be centralized or distributed. It is + * not possible to use central and distributed eviction policies at the same + * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or + * {@link CloneableEvictionPolicy} as it will be cloned to have separated + * instances for each group. At the startup time the distributed policies + * will be stored as sample, and only clones of them will be used to + * maintain the groups. Therefore, each group starts with the initial policy + * states. + * + * While a distributed policy only gets notified with the elements belonging + * to the respective group, a centralized policy get notified with all + * arriving elements. When a centralized trigger occurred, all groups get + * triggered. This is done by submitting the element which caused the + * trigger as real element to the groups it belongs to and as fake element + * to all other groups. Within the groups the element might be further + * processed, causing more triggers, prenotifications of active distributed + * policies and evictions like usual. + * + * Central policies can be instance of {@link ActiveTriggerPolicy} and also + * implement the + * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)} + * method. Fake elements created on prenotification will be forwarded to all + * groups. The {@link ActiveTriggerCallback} is also implemented in a way, + * that it forwards/distributed calls all groups. + * + * @param userFunction + * The user defined function. + * @param keySelector + * A key selector to extract the key for the groups from the + * input data. + * @param distributedTriggerPolicies + * Trigger policies to be distributed and maintained individually + * within each group. + * @param distributedEvictionPolicies + * Eviction policies to be distributed and maintained + * individually within each group. Note that there cannot be + * both, central and distributed eviction policies at the same + * time. + * @param centralTriggerPolicies + * Trigger policies which will only exist once at a central + * place. In case a central policy triggers, it will cause all + * groups to be emitted. (Remark: Empty groups cannot be emitted. + * If only one element is contained a group, this element itself + * is returned as aggregated result.) + * @param centralEvictionPolicies + * Eviction which will only exist once at a central place. Note + * that there cannot be both, central and distributed eviction + * policies at the same time. The central eviction policy will + * work on an simulated element buffer containing all elements no + * matter which group they belong to. + */ + public GroupedWindowInvokable(Function userFunction, KeySelector<IN, ?> keySelector, + LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies, + LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies, + LinkedList<TriggerPolicy<IN>> centralTriggerPolicies, + LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) { + + super(userFunction); + + this.keySelector = keySelector; + + // handle the triggers + if (centralTriggerPolicies != null) { + this.centralTriggerPolicies = centralTriggerPolicies; + this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>(); + + for (TriggerPolicy<IN> trigger : centralTriggerPolicies) { + if (trigger instanceof ActiveTriggerPolicy) { + this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger); + } + } + } else { + this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); + } + + if (distributedTriggerPolicies != null) { + this.distributedTriggerPolicies = distributedTriggerPolicies; + } else { + this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>(); + } + + if (distributedEvictionPolicies != null) { + this.distributedEvictionPolicies = distributedEvictionPolicies; + } else { + this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>(); + } + + this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>(); + + if (centralEvictionPolicies != null) { + this.centralEvictionPolicies = centralEvictionPolicies; + + for (EvictionPolicy<IN> eviction : centralEvictionPolicies) { + if (eviction instanceof ActiveEvictionPolicy) { + this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction); + } + } + } else { + this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>(); + } + + this.windowingGroups = new HashMap<Object, WindowInvokable<IN, OUT>>(); + this.activePolicyThreads = new LinkedList<Thread>(); + this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); + this.deleteOrderForCentralEviction = new LinkedList<WindowInvokable<IN, OUT>>(); + + // check that not both, central and distributed eviction, is used at the + // same time. + if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) { + throw new UnsupportedOperationException( + "You can only use either central or distributed eviction policies but not both at the same time."); + } + + // Check that there is at least one trigger and one eviction policy + if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) { + throw new UnsupportedOperationException( + "You have to define at least one eviction policy"); + } + if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) { + throw new UnsupportedOperationException( + "You have to define at least one trigger policy"); + } + + } + + @Override + public void invoke() throws Exception { + // Prevent empty data streams + if (readNext() == null) { + throw new RuntimeException("DataStream must not be empty"); + } + + // Continuously run + while (nextRecord != null) { + WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector + .getKey(nextRecord.getObject())); + if (groupInvokable == null) { + groupInvokable = makeNewGroup(nextRecord); + } + + // Run the precalls for central active triggers + for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) { + Object[] result = trigger.preNotifyTrigger(nextRecord.getObject()); + for (Object in : result) { + + // If central eviction is used, handle it here + if (!activeCentralEvictionPolicies.isEmpty()) { + evictElements(centralActiveEviction(in)); + } + + // process in groups + for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { + group.processFakeElement(in, trigger); + checkForEmptyGroupBuffer(group); + } + } + } + + // Process non-active central triggers + for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) { + if (triggerPolicy.notifyTrigger(nextRecord.getObject())) { + currentTriggerPolicies.add(triggerPolicy); + } + } + + if (currentTriggerPolicies.isEmpty()) { + + // only add the element to its group + groupInvokable.processRealElement(nextRecord.getObject()); + checkForEmptyGroupBuffer(groupInvokable); + + // If central eviction is used, handle it here + if (!centralEvictionPolicies.isEmpty()) { + evictElements(centralEviction(nextRecord.getObject(), false)); + deleteOrderForCentralEviction.add(groupInvokable); + } + + } else { + + // call user function for all groups + for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { + if (group == groupInvokable) { + // process real with initialized policies + group.processRealElement(nextRecord.getObject(), currentTriggerPolicies); + } else { + // process like a fake but also initialized with + // policies + group.externalTriggerFakeElement(nextRecord.getObject(), + currentTriggerPolicies); + } + + // remove group in case it has an empty buffer + // checkForEmptyGroupBuffer(group); + } + + // If central eviction is used, handle it here + if (!centralEvictionPolicies.isEmpty()) { + evictElements(centralEviction(nextRecord.getObject(), true)); + deleteOrderForCentralEviction.add(groupInvokable); + } + } + + // clear current trigger list + currentTriggerPolicies.clear(); + + // read next record + readNext(); + } + + // Stop all remaining threads from policies + for (Thread t : activePolicyThreads) { + t.interrupt(); + } + + // finally trigger the buffer. + for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { + group.emitFinalWindow(centralTriggerPolicies); + } + + } + + /** + * This method creates a new group. The method gets called in case an + * element arrives which has a key which was not seen before. The method + * created a nested {@link WindowInvokable} and therefore created clones of + * all distributed trigger and eviction policies. + * + * @param element + * The element which leads to the generation of a new group + * (previously unseen key) + * @throws Exception + * In case the {@link KeySelector} throws an exception in + * {@link KeySelector#getKey(Object)}, the exception is not + * catched by this method. + */ + @SuppressWarnings("unchecked") + private WindowInvokable<IN, OUT> makeNewGroup(StreamRecord<IN> element) throws Exception { + // clone the policies + LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); + LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>(); + for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) { + clonedDistributedTriggerPolicies.add(trigger.clone()); + } + for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) { + clonedDistributedEvictionPolicies.add(eviction.clone()); + } + + WindowInvokable<IN, OUT> groupInvokable; + if (userFunction instanceof ReduceFunction) { + groupInvokable = (WindowInvokable<IN, OUT>) new WindowReduceInvokable<IN>( + (ReduceFunction<IN>) userFunction, clonedDistributedTriggerPolicies, + clonedDistributedEvictionPolicies); + } else { + groupInvokable = new WindowGroupReduceInvokable<IN, OUT>( + (GroupReduceFunction<IN, OUT>) userFunction, clonedDistributedTriggerPolicies, + clonedDistributedEvictionPolicies); + } + + groupInvokable.setup(taskContext); + groupInvokable.open(this.parameters); + windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable); + + return groupInvokable; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.parameters = parameters; + for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) { + Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp)); + if (target != null) { + Thread thread = new Thread(target); + activePolicyThreads.add(thread); + thread.start(); + } + } + }; + + /** + * This method is used to notify central eviction policies with a real + * element. + * + * @param input + * the real element to notify the eviction policy. + * @param triggered + * whether a central trigger occurred or not. + * @return The number of elements to be deleted from the buffer. + */ + private int centralEviction(IN input, boolean triggered) { + // Process the evictions and take care of double evictions + // In case there are multiple eviction policies present, + // only the one with the highest return value is recognized. + int currentMaxEviction = 0; + for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) { + // use temporary variable to prevent multiple calls to + // notifyEviction + int tmp = evictionPolicy.notifyEviction(input, triggered, + deleteOrderForCentralEviction.size()); + if (tmp > currentMaxEviction) { + currentMaxEviction = tmp; + } + } + return currentMaxEviction; + } + + /** + * This method is used to notify active central eviction policies with a + * fake element. + * + * @param input + * the fake element to notify the active central eviction + * policies. + * @return The number of elements to be deleted from the buffer. + */ + private int centralActiveEviction(Object input) { + // Process the evictions and take care of double evictions + // In case there are multiple eviction policies present, + // only the one with the highest return value is recognized. + int currentMaxEviction = 0; + for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) { + // use temporary variable to prevent multiple calls to + // notifyEviction + int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, + deleteOrderForCentralEviction.size()); + if (tmp > currentMaxEviction) { + currentMaxEviction = tmp; + } + } + return currentMaxEviction; + } + + /** + * This method is used in central eviction to delete a given number of + * elements from the buffer. + * + * @param numToEvict + * number of elements to delete from the virtual central element + * buffer. + */ + private void evictElements(int numToEvict) { + HashSet<WindowInvokable<IN, OUT>> usedGroups = new HashSet<WindowInvokable<IN, OUT>>(); + for (; numToEvict > 0; numToEvict--) { + WindowInvokable<IN, OUT> currentGroup = deleteOrderForCentralEviction.getFirst(); + // Do the eviction + currentGroup.evictFirst(); + // Remember groups which possibly have an empty buffer after the + // eviction + usedGroups.add(currentGroup); + try { + deleteOrderForCentralEviction.removeFirst(); + } catch (NoSuchElementException e) { + // when buffer is empty, ignore exception and stop deleting + break; + } + + } + + // Remove groups with empty buffer + for (WindowInvokable<IN, OUT> group : usedGroups) { + checkForEmptyGroupBuffer(group); + } + } + + /** + * Checks if the element buffer of a given windowing group is empty. If so, + * the group will be deleted. + * + * @param group + * The windowing group to be checked and and removed in case its + * buffer is empty. + */ + private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) { + if (group.isBufferEmpty()) { + windowingGroups.remove(group); + } + } + + /** + * This callback class allows to handle the callbacks done by threads + * defined in active trigger policies + * + * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback) + */ + private class WindowingCallback implements ActiveTriggerCallback { + private ActiveTriggerPolicy<IN> policy; + + public WindowingCallback(ActiveTriggerPolicy<IN> policy) { + this.policy = policy; + } + + @Override + public void sendFakeElement(Object datapoint) { + + // If central eviction is used, handle it here + if (!centralEvictionPolicies.isEmpty()) { + evictElements(centralActiveEviction(datapoint)); + } + + // handle element in groups + for (WindowInvokable<IN, OUT> group : windowingGroups.values()) { + group.processFakeElement(datapoint, policy); + checkForEmptyGroupBuffer(group); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java new file mode 100644 index 0000000..7c8e577 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; + +public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> { + private static final long serialVersionUID = 1L; + + private MapFunction<IN, OUT> mapper; + + public MapInvokable(MapFunction<IN, OUT> mapper) { + super(mapper); + this.mapper = mapper; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + collector.collect(mapper.map(nextObject)); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java new file mode 100644 index 0000000..c9d9e5a --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.invokable.StreamInvokable; + +public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> { + private static final long serialVersionUID = 1L; + + transient OUT outTuple; + TypeSerializer<OUT> outTypeSerializer; + int[] fields; + int numFields; + + public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) { + super(null); + this.fields = fields; + this.numFields = this.fields.length; + this.outTypeSerializer = outTypeInformation.createSerializer(); + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + for (int i = 0; i < this.numFields; i++) { + outTuple.setField(nextRecord.getField(fields[i]), i); + } + collector.collect(outTuple); + } + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + outTuple = outTypeSerializer.createInstance(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java new file mode 100644 index 0000000..e1a56cc --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; + +public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> { + private static final long serialVersionUID = 1L; + + protected ReduceFunction<IN> reducer; + protected IN currentValue; + protected IN nextValue; + + public StreamReduceInvokable(ReduceFunction<IN> reducer) { + super(reducer); + this.reducer = reducer; + currentValue = null; + } + + @Override + public void invoke() throws Exception { + while (readNext() != null) { + reduce(); + } + } + + protected void reduce() throws Exception { + callUserFunctionAndLogException(); + + } + + @Override + protected void callUserFunction() throws Exception { + + nextValue = nextObject; + + if (currentValue != null) { + currentValue = reducer.reduce(currentValue, nextValue); + } else { + currentValue = nextValue; + + } + collector.collect(currentValue); + + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java new file mode 100644 index 0000000..b3fdfe8 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import java.util.LinkedList; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; + +public class WindowGroupReduceInvokable<IN, OUT> extends WindowInvokable<IN, OUT> { + + private static final long serialVersionUID = 1L; + GroupReduceFunction<IN, OUT> reducer; + + public WindowGroupReduceInvokable(GroupReduceFunction<IN, OUT> userFunction, + LinkedList<TriggerPolicy<IN>> triggerPolicies, + LinkedList<EvictionPolicy<IN>> evictionPolicies) { + super(userFunction, triggerPolicies, evictionPolicies); + this.reducer = userFunction; + } + + @Override + protected void callUserFunction() throws Exception { + reducer.reduce(copyBuffer(), collector); + } + + public LinkedList<IN> copyBuffer() { + LinkedList<IN> copy = new LinkedList<IN>(); + for (IN element : buffer) { + copy.add(copy(element)); + } + return copy; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java new file mode 100644 index 0000000..ea891c9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import java.util.LinkedList; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback; +import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy; +import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; +import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; + +public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { + + /** + * Auto-generated serial version UID + */ + private static final long serialVersionUID = -8038984294071650730L; + + private LinkedList<TriggerPolicy<IN>> triggerPolicies; + private LinkedList<EvictionPolicy<IN>> evictionPolicies; + private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies; + private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies; + private LinkedList<Thread> activePolicyTreads; + protected LinkedList<IN> buffer; + private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies; + + /** + * This constructor created a windowing invokable using trigger and eviction + * policies. + * + * @param userFunction + * The user defined {@link ReduceFunction} + * @param triggerPolicies + * A list of {@link TriggerPolicy}s and/or + * {@link ActiveTriggerPolicy}s + * @param evictionPolicies + * A list of {@link EvictionPolicy}s and/or + * {@link ActiveEvictionPolicy}s + */ + public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies, + LinkedList<EvictionPolicy<IN>> evictionPolicies) { + super(userFunction); + + this.triggerPolicies = triggerPolicies; + this.evictionPolicies = evictionPolicies; + + activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>(); + for (TriggerPolicy<IN> tp : triggerPolicies) { + if (tp instanceof ActiveTriggerPolicy) { + activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp); + } + } + + activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>(); + for (EvictionPolicy<IN> ep : evictionPolicies) { + if (ep instanceof ActiveEvictionPolicy) { + activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep); + } + } + + this.activePolicyTreads = new LinkedList<Thread>(); + this.buffer = new LinkedList<IN>(); + this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>(); + } + + @Override + public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { + super.open(parameters); + for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) { + Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp)); + if (target != null) { + Thread thread = new Thread(target); + activePolicyTreads.add(thread); + thread.start(); + } + } + }; + + /** + * This class allows the active trigger threads to call back and push fake + * elements at any time. + */ + private class WindowingCallback implements ActiveTriggerCallback { + private ActiveTriggerPolicy<IN> policy; + + public WindowingCallback(ActiveTriggerPolicy<IN> policy) { + this.policy = policy; + } + + @Override + public void sendFakeElement(Object datapoint) { + processFakeElement(datapoint, this.policy); + } + + } + + @Override + public void invoke() throws Exception { + + // Prevent empty data streams + if (readNext() == null) { + throw new RuntimeException("DataStream must not be empty"); + } + + // Continuously run + while (nextRecord != null) { + processRealElement(nextRecord.getObject()); + + // Load next StreamRecord + readNext(); + } + + // Stop all remaining threads from policies + for (Thread t : activePolicyTreads) { + t.interrupt(); + } + + // finally trigger the buffer. + emitFinalWindow(null); + + } + + /** + * This method gets called in case of an grouped windowing in case central + * trigger occurred and the arriving element causing the trigger is not part + * of this group. + * + * Remark: This is NOT the same as + * {@link WindowInvokable#processFakeElement(Object, TriggerPolicy)}! Here + * the eviction using active policies takes place after the call to the UDF. + * Usually it is done before when fake elements get submitted. This special + * behaviour is needed to allow the {@link GroupedWindowInvokable} to send + * central triggers to all groups, even if the current element does not + * belong to the group. + * + * @param input + * a fake input element + * @param policies + * the list of policies which caused the call with this fake + * element + */ + protected synchronized void externalTriggerFakeElement(IN input, + List<TriggerPolicy<IN>> policies) { + + // Set the current triggers + currentTriggerPolicies.addAll(policies); + + // emit + callUserFunctionAndLogException(); + + // clear the flag collection + currentTriggerPolicies.clear(); + + // Process the evictions and take care of double evictions + // In case there are multiple eviction policies present, + // only the one with the highest return value is recognized. + int currentMaxEviction = 0; + for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) { + // use temporary variable to prevent multiple calls to + // notifyEviction + int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size()); + if (tmp > currentMaxEviction) { + currentMaxEviction = tmp; + } + } + + for (int i = 0; i < currentMaxEviction; i++) { + try { + buffer.removeFirst(); + } catch (NoSuchElementException e) { + // In case no more elements are in the buffer: + // Prevent failure and stop deleting. + break; + } + } + } + + /** + * This method processed an arrived fake element The method is synchronized + * to ensure that it cannot interleave with + * {@link WindowInvokable#processRealElement(Object)} + * + * @param input + * a fake input element + * @param currentPolicy + * the policy which produced this fake element + */ + protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) { + + // Process the evictions and take care of double evictions + // In case there are multiple eviction policies present, + // only the one with the highest return value is recognized. + int currentMaxEviction = 0; + for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) { + // use temporary variable to prevent multiple calls to + // notifyEviction + int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size()); + if (tmp > currentMaxEviction) { + currentMaxEviction = tmp; + } + } + + for (int i = 0; i < currentMaxEviction; i++) { + try { + buffer.removeFirst(); + } catch (NoSuchElementException e) { + // In case no more elements are in the buffer: + // Prevent failure and stop deleting. + break; + } + } + + // Set the current trigger + currentTriggerPolicies.add(currentPolicy); + + // emit + callUserFunctionAndLogException(); + + // clear the flag collection + currentTriggerPolicies.clear(); + } + + /** + * This method processed an arrived real element The method is synchronized + * to ensure that it cannot interleave with + * {@link WindowInvokable#processFakeElement(Object)}. + * + * @param input + * a real input element + * @param triggerPolicies + * Allows to set trigger policies which are maintained + * externally. This is the case for central policies in + * {@link GroupedWindowInvokable}. + */ + protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>> triggerPolicies) { + this.currentTriggerPolicies.addAll(triggerPolicies); + processRealElement(input); + } + + /** + * This method processed an arrived real element The method is synchronized + * to ensure that it cannot interleave with + * {@link WindowInvokable#processFakeElement(Object)} + * + * @param input + * a real input element + */ + protected synchronized void processRealElement(IN input) { + + // Run the precalls to detect missed windows + for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) { + // Remark: In case multiple active triggers are present the ordering + // of the different fake elements returned by this triggers becomes + // a problem. This might lead to unexpected results... + // Should we limit the number of active triggers to 0 or 1? + Object[] result = trigger.preNotifyTrigger(input); + for (Object in : result) { + processFakeElement(in, trigger); + } + } + + // Remember if a trigger occurred + boolean isTriggered = false; + + // Process the triggers + for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) { + if (triggerPolicy.notifyTrigger(input)) { + currentTriggerPolicies.add(triggerPolicy); + } + } + + // call user function + if (!currentTriggerPolicies.isEmpty()) { + // emit + callUserFunctionAndLogException(); + + // clear the flag collection + currentTriggerPolicies.clear(); + + // remember trigger + isTriggered = true; + } + + // Process the evictions and take care of double evictions + // In case there are multiple eviction policies present, + // only the one with the highest return value is recognized. + int currentMaxEviction = 0; + + for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) { + // use temporary variable to prevent multiple calls to + // notifyEviction + int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size()); + if (tmp > currentMaxEviction) { + currentMaxEviction = tmp; + } + } + + for (int i = 0; i < currentMaxEviction; i++) { + try { + buffer.removeFirst(); + } catch (NoSuchElementException e) { + // In case no more elements are in the buffer: + // Prevent failure and stop deleting. + break; + } + } + + // Add the current element to the buffer + buffer.add(input); + + } + + /** + * This method removes the first element from the element buffer. It is used + * to provide central evictions in {@link GroupedWindowInvokable} + */ + protected synchronized void evictFirst() { + try { + buffer.removeFirst(); + } catch (NoSuchElementException e) { + // ignore exception + } + } + + /** + * This method returns whether the element buffer is empty or not. It is + * used to figure out if a group can be deleted or not when + * {@link GroupedWindowInvokable} is used. + * + * @return true in case the buffer is empty otherwise false. + */ + protected boolean isBufferEmpty() { + return buffer.isEmpty(); + } + + /** + * This method does the final reduce at the end of the stream and emits the + * result. + * + * @param centralTriggerPolicies + * Allows to set trigger policies which are maintained + * externally. This is the case for central policies in + * {@link GroupedWindowInvokable}. + */ + protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies) { + if (!buffer.isEmpty()) { + currentTriggerPolicies.clear(); + + if (centralTriggerPolicies != null) { + currentTriggerPolicies.addAll(centralTriggerPolicies); + } + + for (TriggerPolicy<IN> policy : triggerPolicies) { + currentTriggerPolicies.add(policy); + } + + callUserFunctionAndLogException(); + } + } + +}