Repository: flink
Updated Branches:
  refs/heads/master 9b0ba7ba3 -> 6886f638d


[FLINK-7552] Extend SinkFunction interface with SinkContext


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7996b0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7996b0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7996b0d

Branch: refs/heads/master
Commit: e7996b0d0ff5fe705a3830f3855f977cad4f0c44
Parents: 9b0ba7b
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Aug 29 15:50:56 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Sep 21 13:46:17 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     |   3 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |   3 +-
 .../connectors/rabbitmq/common/RMQSinkTest.java |   7 +-
 .../api/functions/sink/RichSinkFunction.java    |   3 -
 .../api/functions/sink/SinkContextUtil.java     |  56 +++++++++
 .../api/functions/sink/SinkFunction.java        |  48 +++++++-
 .../streaming/api/operators/StreamSink.java     |  60 +++++++++-
 .../api/functions/PrintSinkFunctionTest.java    |  13 ++-
 .../functions/sink/SocketClientSinkTest.java    |   8 +-
 .../api/operators/StreamSinkOperatorTest.java   | 117 +++++++++++++++++++
 pom.xml                                         |   2 +
 11 files changed, 300 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index bc5d1a8..f52a42c 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import com.datastax.driver.core.Cluster;
@@ -459,7 +460,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
                sink.open(new Configuration());
                for (scala.Tuple3<String, Integer, Integer> value : 
scalaTupleCollection) {
-                       sink.invoke(value);
+                       sink.invoke(value, SinkContextUtil.forTimestamp(0));
                }
                sink.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 08c5f01..6b4b6ff 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -117,7 +118,7 @@ public class FlinkKafkaProducerBaseTest {
                producer.open(new Configuration());
                verify(mockPartitioner, times(1)).open(0, 1);
 
-               producer.invoke("foobar");
+               producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
                verify(mockPartitioner, times(1)).partition(
                        "foobar", null, "foobar".getBytes(), 
DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
index 540a7ba..4fb6097 100644
--- 
a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
+++ 
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.rabbitmq.common;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -91,7 +92,7 @@ public class RMQSinkTest {
        public void invokePublishBytesToQueue() throws Exception {
                RMQSink<String> rmqSink = createRMQSink();
 
-               rmqSink.invoke(MESSAGE_STR);
+               rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
                verify(serializationSchema).serialize(MESSAGE_STR);
                verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
        }
@@ -101,7 +102,7 @@ public class RMQSinkTest {
                RMQSink<String> rmqSink = createRMQSink();
 
                doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
-               rmqSink.invoke("msg");
+               rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
        }
 
        @Test
@@ -110,7 +111,7 @@ public class RMQSinkTest {
                rmqSink.setLogFailuresOnly(true);
 
                doThrow(IOException.class).when(channel).basicPublish("", 
QUEUE_NAME, null, MESSAGE);
-               rmqSink.invoke("msg");
+               rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
        }
 
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
index 64c38b9..66a0c93 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -27,7 +27,4 @@ import 
org.apache.flink.api.common.functions.AbstractRichFunction;
 public abstract class RichSinkFunction<IN> extends AbstractRichFunction 
implements SinkFunction<IN> {
 
        private static final long serialVersionUID = 1L;
-
-       public abstract void invoke(IN value) throws Exception;
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
new file mode 100644
index 0000000..2749560
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Utility for creating Sink {@link SinkFunction.Context Contexts}.
+ */
+@Internal
+public class SinkContextUtil {
+
+       /**
+        * Creates a {@link SinkFunction.Context} that
+        * throws an exception when trying to access the current watermark or 
processing time.
+        */
+       public static <T> SinkFunction.Context<T> forTimestamp(long timestamp) {
+               return new SinkFunction.Context<T>() {
+                       @Override
+                       public long currentProcessingTime() {
+                               throw new RuntimeException("Not implemented");
+                       }
+
+                       @Override
+                       public long currentWatermark() {
+                               throw new RuntimeException("Not implemented");
+                       }
+
+                       @Override
+                       public long timestamp() {
+                               return timestamp;
+                       }
+
+                       @Override
+                       public boolean hasTimestamp() {
+                               return true;
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index cb9e11d..15a77c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -35,6 +35,52 @@ public interface SinkFunction<IN> extends Function, 
Serializable {
         *
         * @param value The input record.
         * @throws Exception
+        * @deprecated Use {@link #invoke(Object, Context)}.
         */
-       void invoke(IN value) throws Exception;
+       @Deprecated
+       default void invoke(IN value) throws Exception {
+       }
+
+       /**
+        * Writes the given value to the sink. This function is called for 
every record.
+        *
+        * @param value The input record.
+        * @param context Additional context about the input record.
+        * @throws Exception
+        */
+       default void invoke(IN value, Context context) throws Exception {
+               invoke(value);
+       }
+
+       /**
+        * Context that {@link SinkFunction SinkFunctions } can use for getting 
additional data about
+        * an input record.
+        *
+        * <p>The context is only valid for the duration of a
+        * {@link SinkFunction#invoke(Object, Context)} call. Do not store the 
context and use
+        * afterwards!
+        *
+        * @param <T> The type of elements accepted by the sink.
+        */
+       @Public // Interface might be extended in the future with additional 
methods.
+       interface Context<T> {
+
+               /** Returns the current processing time. */
+               long currentProcessingTime();
+
+               /** Returns the current event-time watermark. */
+               long currentWatermark();
+
+               /**
+                * Returns the timestamp of the current input record.
+                */
+               long timestamp();
+
+               /**
+                * Checks whether this record has a timestamp.
+                *
+                * @return True if the record has a timestamp, false if not.
+                */
+               boolean hasTimestamp();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index d92d789..f4b09af 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -19,8 +19,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 /**
  * A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
@@ -31,14 +33,27 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
 
        private static final long serialVersionUID = 1L;
 
+       private transient SimpleContext sinkContext;
+
+       /** We listen to this ourselves because we don't have an {@link 
InternalTimerService}. */
+       private long currentWatermark = Long.MIN_VALUE;
+
        public StreamSink(SinkFunction<IN> sinkFunction) {
                super(sinkFunction);
                chainingStrategy = ChainingStrategy.ALWAYS;
        }
 
        @Override
+       public void open() throws Exception {
+               super.open();
+
+               this.sinkContext = new 
SimpleContext<>(getProcessingTimeService());
+       }
+
+       @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
-               userFunction.invoke(element.getValue());
+               sinkContext.element = element;
+               userFunction.invoke(element.getValue(), sinkContext);
        }
 
        @Override
@@ -48,4 +63,47 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
 
                // sinks don't forward latency markers
        }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+               this.currentWatermark = mark.getTimestamp();
+       }
+
+       private class SimpleContext<IN> implements SinkFunction.Context<IN> {
+
+               private StreamRecord<IN> element;
+
+               private final ProcessingTimeService processingTimeService;
+
+               public SimpleContext(ProcessingTimeService 
processingTimeService) {
+                       this.processingTimeService = processingTimeService;
+               }
+
+               @Override
+               public long currentProcessingTime() {
+                       return processingTimeService.getCurrentProcessingTime();
+               }
+
+               @Override
+               public long currentWatermark() {
+                       return currentWatermark;
+               }
+
+               @Override
+               public long timestamp() {
+                       if (!element.hasTimestamp()) {
+                               throw new IllegalStateException(
+                                       "Record has no timestamp. Is the time 
characteristic set to 'ProcessingTime', or " +
+                                                       "did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
+
+                       }
+                       return element.getTimestamp();
+               }
+
+               public boolean hasTimestamp() {
+                       return element.hasTimestamp();
+               }
+
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index 9e7ecdd..8c303d1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import org.junit.After;
@@ -40,7 +41,7 @@ public class PrintSinkFunctionTest {
        private String line = System.lineSeparator();
 
        @Test
-       public void testPrintSinkStdOut(){
+       public void testPrintSinkStdOut() throws Exception {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                PrintStream stream = new PrintStream(baos);
                System.setOut(stream);
@@ -55,7 +56,7 @@ public class PrintSinkFunctionTest {
                        Assert.fail();
                }
                printSink.setTargetToStandardOut();
-               printSink.invoke("hello world!");
+               printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
 
                assertEquals("Print to System.out", printSink.toString());
                assertEquals("hello world!" + line, baos.toString());
@@ -65,7 +66,7 @@ public class PrintSinkFunctionTest {
        }
 
        @Test
-       public void testPrintSinkStdErr(){
+       public void testPrintSinkStdErr() throws Exception {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                PrintStream stream = new PrintStream(baos);
                System.setOut(stream);
@@ -80,7 +81,7 @@ public class PrintSinkFunctionTest {
                        Assert.fail();
                }
                printSink.setTargetToStandardErr();
-               printSink.invoke("hello world!");
+               printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
 
                assertEquals("Print to System.err", printSink.toString());
                assertEquals("hello world!" + line, baos.toString());
@@ -90,7 +91,7 @@ public class PrintSinkFunctionTest {
        }
 
        @Test
-       public void testPrintSinkWithPrefix(){
+       public void testPrintSinkWithPrefix() throws Exception {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                PrintStream stream = new PrintStream(baos);
                System.setOut(stream);
@@ -107,7 +108,7 @@ public class PrintSinkFunctionTest {
                        Assert.fail();
                }
                printSink.setTargetToStandardErr();
-               printSink.invoke("hello world!");
+               printSink.invoke("hello world!", 
SinkContextUtil.forTimestamp(0));
 
                assertEquals("Print to System.err", printSink.toString());
                assertEquals("2> hello world!" + line, baos.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 63e83d2..6cdce11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -74,7 +74,7 @@ public class SocketClientSinkTest extends TestLogger {
                                try {
                                        SocketClientSink<String> simpleSink = 
new SocketClientSink<>(host, port, simpleSchema, 0);
                                        simpleSink.open(new Configuration());
-                                       simpleSink.invoke(TEST_MESSAGE + '\n');
+                                       simpleSink.invoke(TEST_MESSAGE + '\n', 
SinkContextUtil.forTimestamp(0));
                                        simpleSink.close();
                                }
                                catch (Throwable t) {
@@ -117,7 +117,7 @@ public class SocketClientSinkTest extends TestLogger {
                        public void run() {
                                try {
                                        // need two messages here: send a fin 
to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
-                                       simpleSink.invoke(TEST_MESSAGE + '\n');
+                                       simpleSink.invoke(TEST_MESSAGE + '\n', 
SinkContextUtil.forTimestamp(0));
                                }
                                catch (Throwable t) {
                                        error.set(t);
@@ -182,7 +182,7 @@ public class SocketClientSinkTest extends TestLogger {
                                // socket should be closed, so this should 
trigger a re-try
                                // need two messages here: send a fin to cancel 
the client state:FIN_WAIT_2 while the server is CLOSE_WAIT
                                while (true) { // we have to do this more often 
as the server side closed is not guaranteed to be noticed immediately
-                                       simpleSink.invoke(TEST_MESSAGE + '\n');
+                                       simpleSink.invoke(TEST_MESSAGE + '\n', 
SinkContextUtil.forTimestamp(0));
                                }
                        }
                        catch (IOException e) {
@@ -238,7 +238,7 @@ public class SocketClientSinkTest extends TestLogger {
 
                        // Initial payload => this will be received by the 
server an then the socket will be
                        // closed.
-                       sink.invoke("0\n");
+                       sink.invoke("0\n", SinkContextUtil.forTimestamp(0));
 
                        // Get future an make sure there was no problem. This 
will rethrow any Exceptions from
                        // the server.

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
new file mode 100644
index 0000000..500a52a
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link StreamSink}.
+ */
+public class StreamSinkOperatorTest extends TestLogger {
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
+
+       /**
+        * Verify that we can correctly query watermark, processing time and 
the timestamp from the
+        * context.
+        */
+       @Test
+       public void testTimeQuerying() throws Exception {
+
+               BufferingQueryingSink<String> bufferingSink = new 
BufferingQueryingSink<>();
+
+               StreamSink<String> operator = new StreamSink<>(bufferingSink);
+
+               OneInputStreamOperatorTestHarness<String, Object> testHarness =
+                               new 
OneInputStreamOperatorTestHarness<>(operator);
+
+               testHarness.setup();
+               testHarness.open();
+
+               testHarness.processWatermark(new Watermark(17));
+               testHarness.setProcessingTime(12);
+               testHarness.processElement(new StreamRecord<>("Hello", 12L));
+
+               testHarness.processWatermark(new Watermark(42));
+               testHarness.setProcessingTime(15);
+               testHarness.processElement(new StreamRecord<>("Ciao", 13L));
+
+               testHarness.processWatermark(new Watermark(42));
+               testHarness.setProcessingTime(15);
+               testHarness.processElement(new StreamRecord<>("Ciao"));
+
+               assertThat(bufferingSink.data.size(), is(3));
+
+               assertThat(bufferingSink.data,
+                       contains(
+                               new Tuple4<>(17L, 12L, 12L, "Hello"),
+                               new Tuple4<>(42L, 15L, 13L, "Ciao"),
+                               new Tuple4<>(42L, 15L, null, "Ciao")));
+
+               testHarness.close();
+       }
+
+       private static class BufferingQueryingSink<T> implements 
SinkFunction<T> {
+
+               // watermark, processing-time, timestamp, event
+               private final List<Tuple4<Long, Long, Long, T>> data;
+
+               public BufferingQueryingSink() {
+                       data = new ArrayList<>();
+               }
+
+               @Override
+               public void invoke(
+                       T value, Context context) throws Exception {
+                       if (context.hasTimestamp()) {
+                               data.add(
+                                       new Tuple4<>(
+                                               context.currentWatermark(),
+                                               context.currentProcessingTime(),
+                                               context.timestamp(),
+                                               value));
+                       } else {
+                               data.add(
+                                       new Tuple4<>(
+                                               context.currentWatermark(),
+                                               context.currentProcessingTime(),
+                                               null,
+                                               value));
+
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 561ec79..229e93d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1457,6 +1457,8 @@ under the License.
                                                        <excludes>
                                                                
<exclude>@org.apache.flink.annotation.PublicEvolving</exclude>
                                                                
<exclude>@org.apache.flink.annotation.Internal</exclude>
+                                                               
<exclude>org.apache.flink.streaming.api.functions.sink.RichSinkFunction#invoke(java.lang.Object)</exclude>
+                                                               
<exclude>org.apache.flink.streaming.api.functions.sink.SinkFunction</exclude>
                                                        </excludes>
                                                        
<accessModifier>public</accessModifier>
                                                        
<breakBuildOnModifications>false</breakBuildOnModifications>

Reply via email to