Repository: flink Updated Branches: refs/heads/master 9b0ba7ba3 -> 6886f638d
[FLINK-7552] Extend SinkFunction interface with SinkContext Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7996b0d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7996b0d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7996b0d Branch: refs/heads/master Commit: e7996b0d0ff5fe705a3830f3855f977cad4f0c44 Parents: 9b0ba7b Author: Aljoscha Krettek <[email protected]> Authored: Tue Aug 29 15:50:56 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Sep 21 13:46:17 2017 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraConnectorITCase.java | 3 +- .../kafka/FlinkKafkaProducerBaseTest.java | 3 +- .../connectors/rabbitmq/common/RMQSinkTest.java | 7 +- .../api/functions/sink/RichSinkFunction.java | 3 - .../api/functions/sink/SinkContextUtil.java | 56 +++++++++ .../api/functions/sink/SinkFunction.java | 48 +++++++- .../streaming/api/operators/StreamSink.java | 60 +++++++++- .../api/functions/PrintSinkFunctionTest.java | 13 ++- .../functions/sink/SocketClientSinkTest.java | 8 +- .../api/operators/StreamSinkOperatorTest.java | 117 +++++++++++++++++++ pom.xml | 2 + 11 files changed, 300 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index bc5d1a8..f52a42c 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import com.datastax.driver.core.Cluster; @@ -459,7 +460,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri sink.open(new Configuration()); for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) { - sink.invoke(value); + sink.invoke(value, SinkContextUtil.forTimestamp(0)); } sink.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java index 08c5f01..6b4b6ff 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; @@ -117,7 +118,7 @@ public class FlinkKafkaProducerBaseTest { producer.open(new Configuration()); verify(mockPartitioner, times(1)).open(0, 1); - producer.invoke("foobar"); + producer.invoke("foobar", SinkContextUtil.forTimestamp(0)); verify(mockPartitioner, times(1)).partition( "foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3}); } http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java index 540a7ba..4fb6097 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.rabbitmq.common; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -91,7 +92,7 @@ public class RMQSinkTest { public void invokePublishBytesToQueue() throws Exception { RMQSink<String> rmqSink = createRMQSink(); - rmqSink.invoke(MESSAGE_STR); + rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0)); verify(serializationSchema).serialize(MESSAGE_STR); verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); } @@ -101,7 +102,7 @@ public class RMQSinkTest { RMQSink<String> rmqSink = createRMQSink(); doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); - rmqSink.invoke("msg"); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); } @Test @@ -110,7 +111,7 @@ public class RMQSinkTest { rmqSink.setLogFailuresOnly(true); doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE); - rmqSink.invoke("msg"); + rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0)); } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java index 64c38b9..66a0c93 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java @@ -27,7 +27,4 @@ import org.apache.flink.api.common.functions.AbstractRichFunction; public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> { private static final long serialVersionUID = 1L; - - public abstract void invoke(IN value) throws Exception; - } http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java new file mode 100644 index 0000000..2749560 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink; + +import org.apache.flink.annotation.Internal; + +/** + * Utility for creating Sink {@link SinkFunction.Context Contexts}. + */ +@Internal +public class SinkContextUtil { + + /** + * Creates a {@link SinkFunction.Context} that + * throws an exception when trying to access the current watermark or processing time. + */ + public static <T> SinkFunction.Context<T> forTimestamp(long timestamp) { + return new SinkFunction.Context<T>() { + @Override + public long currentProcessingTime() { + throw new RuntimeException("Not implemented"); + } + + @Override + public long currentWatermark() { + throw new RuntimeException("Not implemented"); + } + + @Override + public long timestamp() { + return timestamp; + } + + @Override + public boolean hasTimestamp() { + return true; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java index cb9e11d..15a77c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java @@ -35,6 +35,52 @@ public interface SinkFunction<IN> extends Function, Serializable { * * @param value The input record. * @throws Exception + * @deprecated Use {@link #invoke(Object, Context)}. */ - void invoke(IN value) throws Exception; + @Deprecated + default void invoke(IN value) throws Exception { + } + + /** + * Writes the given value to the sink. This function is called for every record. + * + * @param value The input record. + * @param context Additional context about the input record. + * @throws Exception + */ + default void invoke(IN value, Context context) throws Exception { + invoke(value); + } + + /** + * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about + * an input record. + * + * <p>The context is only valid for the duration of a + * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use + * afterwards! + * + * @param <T> The type of elements accepted by the sink. + */ + @Public // Interface might be extended in the future with additional methods. + interface Context<T> { + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + long currentWatermark(); + + /** + * Returns the timestamp of the current input record. + */ + long timestamp(); + + /** + * Checks whether this record has a timestamp. + * + * @return True if the record has a timestamp, false if not. + */ + boolean hasTimestamp(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index d92d789..f4b09af 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -19,8 +19,10 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; /** * A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}. @@ -31,14 +33,27 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti private static final long serialVersionUID = 1L; + private transient SimpleContext sinkContext; + + /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ + private long currentWatermark = Long.MIN_VALUE; + public StreamSink(SinkFunction<IN> sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override + public void open() throws Exception { + super.open(); + + this.sinkContext = new SimpleContext<>(getProcessingTimeService()); + } + + @Override public void processElement(StreamRecord<IN> element) throws Exception { - userFunction.invoke(element.getValue()); + sinkContext.element = element; + userFunction.invoke(element.getValue(), sinkContext); } @Override @@ -48,4 +63,47 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti // sinks don't forward latency markers } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.currentWatermark = mark.getTimestamp(); + } + + private class SimpleContext<IN> implements SinkFunction.Context<IN> { + + private StreamRecord<IN> element; + + private final ProcessingTimeService processingTimeService; + + public SimpleContext(ProcessingTimeService processingTimeService) { + this.processingTimeService = processingTimeService; + } + + @Override + public long currentProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public long timestamp() { + if (!element.hasTimestamp()) { + throw new IllegalStateException( + "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " + + "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); + + } + return element.getTimestamp(); + } + + public boolean hasTimestamp() { + return element.hasTimestamp(); + } + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java index 9e7ecdd..8c303d1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.junit.After; @@ -40,7 +41,7 @@ public class PrintSinkFunctionTest { private String line = System.lineSeparator(); @Test - public void testPrintSinkStdOut(){ + public void testPrintSinkStdOut() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream stream = new PrintStream(baos); System.setOut(stream); @@ -55,7 +56,7 @@ public class PrintSinkFunctionTest { Assert.fail(); } printSink.setTargetToStandardOut(); - printSink.invoke("hello world!"); + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); assertEquals("Print to System.out", printSink.toString()); assertEquals("hello world!" + line, baos.toString()); @@ -65,7 +66,7 @@ public class PrintSinkFunctionTest { } @Test - public void testPrintSinkStdErr(){ + public void testPrintSinkStdErr() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream stream = new PrintStream(baos); System.setOut(stream); @@ -80,7 +81,7 @@ public class PrintSinkFunctionTest { Assert.fail(); } printSink.setTargetToStandardErr(); - printSink.invoke("hello world!"); + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); assertEquals("Print to System.err", printSink.toString()); assertEquals("hello world!" + line, baos.toString()); @@ -90,7 +91,7 @@ public class PrintSinkFunctionTest { } @Test - public void testPrintSinkWithPrefix(){ + public void testPrintSinkWithPrefix() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream stream = new PrintStream(baos); System.setOut(stream); @@ -107,7 +108,7 @@ public class PrintSinkFunctionTest { Assert.fail(); } printSink.setTargetToStandardErr(); - printSink.invoke("hello world!"); + printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0)); assertEquals("Print to System.err", printSink.toString()); assertEquals("2> hello world!" + line, baos.toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java index 63e83d2..6cdce11 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java @@ -74,7 +74,7 @@ public class SocketClientSinkTest extends TestLogger { try { SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port, simpleSchema, 0); simpleSink.open(new Configuration()); - simpleSink.invoke(TEST_MESSAGE + '\n'); + simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0)); simpleSink.close(); } catch (Throwable t) { @@ -117,7 +117,7 @@ public class SocketClientSinkTest extends TestLogger { public void run() { try { // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT - simpleSink.invoke(TEST_MESSAGE + '\n'); + simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0)); } catch (Throwable t) { error.set(t); @@ -182,7 +182,7 @@ public class SocketClientSinkTest extends TestLogger { // socket should be closed, so this should trigger a re-try // need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the server is CLOSE_WAIT while (true) { // we have to do this more often as the server side closed is not guaranteed to be noticed immediately - simpleSink.invoke(TEST_MESSAGE + '\n'); + simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0)); } } catch (IOException e) { @@ -238,7 +238,7 @@ public class SocketClientSinkTest extends TestLogger { // Initial payload => this will be received by the server an then the socket will be // closed. - sink.invoke("0\n"); + sink.invoke("0\n", SinkContextUtil.forTimestamp(0)); // Get future an make sure there was no problem. This will rethrow any Exceptions from // the server. http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java new file mode 100644 index 0000000..500a52a --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link StreamSink}. + */ +public class StreamSinkOperatorTest extends TestLogger { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + /** + * Verify that we can correctly query watermark, processing time and the timestamp from the + * context. + */ + @Test + public void testTimeQuerying() throws Exception { + + BufferingQueryingSink<String> bufferingSink = new BufferingQueryingSink<>(); + + StreamSink<String> operator = new StreamSink<>(bufferingSink); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.setProcessingTime(12); + testHarness.processElement(new StreamRecord<>("Hello", 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.setProcessingTime(15); + testHarness.processElement(new StreamRecord<>("Ciao", 13L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.setProcessingTime(15); + testHarness.processElement(new StreamRecord<>("Ciao")); + + assertThat(bufferingSink.data.size(), is(3)); + + assertThat(bufferingSink.data, + contains( + new Tuple4<>(17L, 12L, 12L, "Hello"), + new Tuple4<>(42L, 15L, 13L, "Ciao"), + new Tuple4<>(42L, 15L, null, "Ciao"))); + + testHarness.close(); + } + + private static class BufferingQueryingSink<T> implements SinkFunction<T> { + + // watermark, processing-time, timestamp, event + private final List<Tuple4<Long, Long, Long, T>> data; + + public BufferingQueryingSink() { + data = new ArrayList<>(); + } + + @Override + public void invoke( + T value, Context context) throws Exception { + if (context.hasTimestamp()) { + data.add( + new Tuple4<>( + context.currentWatermark(), + context.currentProcessingTime(), + context.timestamp(), + value)); + } else { + data.add( + new Tuple4<>( + context.currentWatermark(), + context.currentProcessingTime(), + null, + value)); + + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 561ec79..229e93d 100644 --- a/pom.xml +++ b/pom.xml @@ -1457,6 +1457,8 @@ under the License. <excludes> <exclude>@org.apache.flink.annotation.PublicEvolving</exclude> <exclude>@org.apache.flink.annotation.Internal</exclude> + <exclude>org.apache.flink.streaming.api.functions.sink.RichSinkFunction#invoke(java.lang.Object)</exclude> + <exclude>org.apache.flink.streaming.api.functions.sink.SinkFunction</exclude> </excludes> <accessModifier>public</accessModifier> <breakBuildOnModifications>false</breakBuildOnModifications>
