[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6886f638
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6886f638
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6886f638

Branch: refs/heads/master
Commit: 6886f638d70419e01ebdfc1cdbb6d834f3fb30b0
Parents: e7996b0
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Aug 29 15:53:16 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Sep 21 13:46:39 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 226 +++++--------------
 .../kafka/FlinkKafkaProducerBase.java           |   2 +-
 .../connectors/kafka/KafkaProducerTestBase.java |   3 +
 .../api/functions/sink/SinkContextUtil.java     |   7 +-
 .../api/functions/sink/SinkFunction.java        |  26 +--
 .../streaming/api/operators/StreamSink.java     |  16 +-
 .../api/operators/StreamSinkOperatorTest.java   |   3 +-
 7 files changed, 81 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 3b9dff1..3b43a7e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,24 +17,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -43,32 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 import java.util.Properties;
 
-import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
-import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
-
 /**
  * Flink Sink to produce data into a Kafka topic. This producer is compatible 
with Kafka 0.10.x
- *
- * <p>Implementation note: This producer is a hybrid between a regular regular 
sink function (a)
- * and a custom operator (b).
- *
- * <p>For (a), the class implements the SinkFunction and RichFunction 
interfaces.
- * For (b), it extends the StreamTask class.
- *
- * <p>Details about approach (a):
- *  Pre Kafka 0.10 producers only follow approach (a), allowing users to use 
the producer using the
- *  DataStream.addSink() method.
- *  Since the APIs exposed in that variant do not allow accessing the the 
timestamp attached to the record
- *  the Kafka 0.10 producer has a second invocation option, approach (b).
- *
- * <p>Details about approach (b):
- *  Kafka 0.10 supports writing the timestamp attached to a record to Kafka. 
When adding the
- *  FlinkKafkaProducer010 using the 
FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
- *  can access the internal record timestamp of the record and write it to 
Kafka.
- *
- * <p>All methods and constructors in this class are marked with the approach 
they are needed for.
  */
-public class FlinkKafkaProducer010<T> extends StreamSink<T> implements 
SinkFunction<T>, RichFunction, CheckpointedFunction {
+public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
 
        /**
         * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
@@ -87,7 +54,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         * @param topicId ID of the Kafka topic.
         * @param serializationSchema User defined serialization schema 
supporting key/value messages
         * @param producerConfig Properties with the producer configuration.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
         */
+       @Deprecated
        public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
@@ -105,7 +76,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         * @param topicId ID of the Kafka topic.
         * @param serializationSchema User defined (keyless) serialization 
schema.
         * @param producerConfig Properties with the producer configuration.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
         */
+       @Deprecated
        public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        SerializationSchema<T> serializationSchema,
@@ -124,20 +99,24 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
         *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
         */
+       @Deprecated
        public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
                                                                                
                                                                                
        Properties producerConfig,
                                                                                
                                                                                
        FlinkKafkaPartitioner<T> customPartitioner) {
 
-               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
                FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+               DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(streamSink, 
inStream, kafkaProducer);
+
        }
 
-       // ---------------------- Regular constructors w/o timestamp support  
------------------
+       // ---------------------- Regular constructors------------------
 
        /**
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
@@ -220,9 +199,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
         */
        public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<T> customPartitioner) {
-               // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
-               // invoke call.
-               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
        // ----------------------------- Deprecated constructors / factory 
methods  ---------------------------
@@ -250,11 +227,10 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
                                                                                
                                                                                
        Properties producerConfig,
                                                                                
                                                                                
        KafkaPartitioner<T> customPartitioner) {
 
-               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
                FlinkKafkaProducer010<T> kafkaProducer =
                                new FlinkKafkaProducer010<>(topicId, 
serializationSchema, producerConfig, new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+               DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<T>(streamSink, 
inStream, kafkaProducer);
        }
 
        /**
@@ -288,157 +264,75 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
        public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
                // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
                // invoke call.
-               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
-       // ----------------------------- Generic element processing  
---------------------------
+       /**
+        * If set to true, Flink will write the (event time) timestamp attached 
to each record into Kafka.
+        * Timestamps must be positive for Kafka to accept them.
+        *
+        * @param writeTimestampToKafka Flag indicating if Flink's internal 
timestamps are written to Kafka.
+        */
+       public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+               this.writeTimestampToKafka = writeTimestampToKafka;
+       }
 
-       private void invokeInternal(T next, long elementTimestamp) throws 
Exception {
 
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
+       // ----------------------------- Generic element processing  
---------------------------
 
-               internalProducer.checkErroneous();
+       @Override
+       public void invoke(T value, Context context) throws Exception {
 
-               byte[] serializedKey = 
internalProducer.schema.serializeKey(next);
-               byte[] serializedValue = 
internalProducer.schema.serializeValue(next);
-               String targetTopic = 
internalProducer.schema.getTargetTopic(next);
+               checkErroneous();
+
+               byte[] serializedKey = schema.serializeKey(value);
+               byte[] serializedValue = schema.serializeValue(value);
+               String targetTopic = schema.getTargetTopic(value);
                if (targetTopic == null) {
-                       targetTopic = internalProducer.defaultTopicId;
+                       targetTopic = defaultTopicId;
                }
 
                Long timestamp = null;
                if (this.writeTimestampToKafka) {
-                       timestamp = elementTimestamp;
+                       timestamp = context.timestamp();
                }
 
                ProducerRecord<byte[], byte[]> record;
-               int[] partitions = 
internalProducer.topicPartitionsMap.get(targetTopic);
+               int[] partitions = topicPartitionsMap.get(targetTopic);
                if (null == partitions) {
-                       partitions = getPartitionsByTopic(targetTopic, 
internalProducer.producer);
-                       internalProducer.topicPartitionsMap.put(targetTopic, 
partitions);
+                       partitions = getPartitionsByTopic(targetTopic, 
producer);
+                       topicPartitionsMap.put(targetTopic, partitions);
                }
-               if (internalProducer.flinkKafkaPartitioner == null) {
+               if (flinkKafkaPartitioner == null) {
                        record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
                } else {
-                       record = new ProducerRecord<>(targetTopic, 
internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, 
serializedValue, targetTopic, partitions), timestamp, serializedKey, 
serializedValue);
+                       record = new ProducerRecord<>(targetTopic, 
flinkKafkaPartitioner.partition(value, serializedKey, serializedValue, 
targetTopic, partitions), timestamp, serializedKey, serializedValue);
                }
-               if (internalProducer.flushOnCheckpoint) {
-                       synchronized (internalProducer.pendingRecordsLock) {
-                               internalProducer.pendingRecords++;
+               if (flushOnCheckpoint) {
+                       synchronized (pendingRecordsLock) {
+                               pendingRecords++;
                        }
                }
-               internalProducer.producer.send(record, 
internalProducer.callback);
-       }
-
-       // ----------------- Helper methods implementing methods from 
SinkFunction and RichFunction (Approach (a)) ----
-
-       // ---- Configuration setters
-
-       /**
-        * Defines whether the producer should fail on errors, or only log them.
-        * If this is set to true, then exceptions will be only logged, if set 
to false,
-        * exceptions will be eventually thrown and cause the streaming program 
to
-        * fail (and enter recovery).
-        *
-        * <p>Method is only accessible for approach (a) (see above)
-        *
-        * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
-        */
-       public void setLogFailuresOnly(boolean logFailuresOnly) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setLogFailuresOnly(logFailuresOnly);
-       }
-
-       /**
-        * If set to true, the Flink producer will wait for all outstanding 
messages in the Kafka buffers
-        * to be acknowledged by the Kafka producer on a checkpoint.
-        * This way, the producer can guarantee that messages in the Kafka 
buffers are part of the checkpoint.
-        *
-        * <p>Method is only accessible for approach (a) (see above)
-        *
-        * @param flush Flag indicating the flushing mode (true = flush on 
checkpoint)
-        */
-       public void setFlushOnCheckpoint(boolean flush) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setFlushOnCheckpoint(flush);
-       }
-
-       /**
-        * This method is used for approach (a) (see above).
-        */
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.open(parameters);
-       }
-
-       /**
-        * This method is used for approach (a) (see above).
-        */
-       @Override
-       public IterationRuntimeContext getIterationRuntimeContext() {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               return internalProducer.getIterationRuntimeContext();
-       }
-
-       /**
-        * This method is used for approach (a) (see above).
-        */
-       @Override
-       public void setRuntimeContext(RuntimeContext t) {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.setRuntimeContext(t);
-       }
-
-       /**
-        * Invoke method for using the Sink as DataStream.addSink() sink.
-        *
-        * <p>This method is used for approach (a) (see above)
-        *
-        * @param value The input record.
-        */
-       @Override
-       public void invoke(T value) throws Exception {
-               invokeInternal(value, Long.MAX_VALUE);
-       }
-
-       // ----------------- Helper methods and classes implementing methods 
from StreamSink (Approach (b)) ----
-
-       /**
-        * Process method for using the sink with timestamp support.
-        *
-        * <p>This method is used for approach (b) (see above)
-        */
-       @Override
-       public void processElement(StreamRecord<T> element) throws Exception {
-               invokeInternal(element.getValue(), element.getTimestamp());
-       }
-
-       @Override
-       public void initializeState(FunctionInitializationContext context) 
throws Exception {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.initializeState(context);
-       }
-
-       @Override
-       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-               final FlinkKafkaProducerBase<T> internalProducer = 
(FlinkKafkaProducerBase<T>) userFunction;
-               internalProducer.snapshotState(context);
+               producer.send(record, callback);
        }
 
        /**
         * Configuration object returned by the writeToKafkaWithTimestamps() 
call.
+        *
+        * <p>This is only kept because it's part of the public API. It is not 
necessary anymore, now
+        * that the {@link SinkFunction} interface provides timestamps.
         */
        public static class FlinkKafkaProducer010Configuration<T> extends 
DataStreamSink<T> {
 
-               private final FlinkKafkaProducerBase wrappedProducerBase;
                private final FlinkKafkaProducer010 producer;
 
-               private FlinkKafkaProducer010Configuration(DataStream stream, 
FlinkKafkaProducer010<T> producer) {
+               private FlinkKafkaProducer010Configuration(
+                               DataStreamSink originalSink,
+                               DataStream<T> inputStream,
+                               FlinkKafkaProducer010<T> producer) {
                        //noinspection unchecked
-                       super(stream, producer);
+                       super(inputStream, 
originalSink.getTransformation().getOperator());
                        this.producer = producer;
-                       this.wrappedProducerBase = (FlinkKafkaProducerBase) 
producer.userFunction;
                }
 
                /**
@@ -450,7 +344,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                 * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
                 */
                public void setLogFailuresOnly(boolean logFailuresOnly) {
-                       
this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly);
+                       producer.setLogFailuresOnly(logFailuresOnly);
                }
 
                /**
@@ -461,7 +355,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                 * @param flush Flag indicating the flushing mode (true = flush 
on checkpoint)
                 */
                public void setFlushOnCheckpoint(boolean flush) {
-                       this.wrappedProducerBase.setFlushOnCheckpoint(flush);
+                       producer.setFlushOnCheckpoint(flush);
                }
 
                /**
@@ -471,7 +365,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                 * @param writeTimestampToKafka Flag indicating if Flink's 
internal timestamps are written to Kafka.
                 */
                public void setWriteTimestampToKafka(boolean 
writeTimestampToKafka) {
-                       this.producer.writeTimestampToKafka = 
writeTimestampToKafka;
+                       producer.writeTimestampToKafka = writeTimestampToKafka;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 76a2f84..befc1a1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -276,7 +276,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
         *              The incoming data
         */
        @Override
-       public void invoke(IN next) throws Exception {
+       public void invoke(IN next, Context context) throws Exception {
                // propagate asynchronous errors
                checkErroneous();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 000de52..35607dd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -215,6 +215,9 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
         * This test sets KafkaProducer so that it will not automatically flush 
the data and
         * simulate network failure between Flink and Kafka to check whether 
FlinkKafkaProducer
         * flushed records manually on snapshotState.
+        *
+        * <p>Due to legacy reasons there are two different ways of 
instantiating a Kafka 0.10 sink. The
+        * parameter controls which method is used.
         */
        protected void testOneToOneAtLeastOnce(boolean regularSink) throws 
Exception {
                final String topic = regularSink ? "oneToOneTopicRegularSink" : 
"oneToOneTopicCustomOperator";

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
index 2749560..3b02ad0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
@@ -43,14 +43,9 @@ public class SinkContextUtil {
                        }
 
                        @Override
-                       public long timestamp() {
+                       public Long timestamp() {
                                return timestamp;
                        }
-
-                       @Override
-                       public boolean hasTimestamp() {
-                               return true;
-                       }
                };
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index 15a77c4..74870bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -31,22 +31,22 @@ import java.io.Serializable;
 public interface SinkFunction<IN> extends Function, Serializable {
 
        /**
-        * Function for standard sink behaviour. This function is called for 
every record.
-        *
-        * @param value The input record.
-        * @throws Exception
         * @deprecated Use {@link #invoke(Object, Context)}.
         */
        @Deprecated
-       default void invoke(IN value) throws Exception {
-       }
+       default void invoke(IN value) throws Exception {}
 
        /**
         * Writes the given value to the sink. This function is called for 
every record.
         *
+        * <p>You have to override this method when implementing a {@code 
SinkFunction}, this is a
+        * {@code default} method for backward compatibility with the old-style 
method only.
+        *
         * @param value The input record.
         * @param context Additional context about the input record.
-        * @throws Exception
+        *
+        * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
+        *                   to fail and may trigger recovery.
         */
        default void invoke(IN value, Context context) throws Exception {
                invoke(value);
@@ -72,15 +72,9 @@ public interface SinkFunction<IN> extends Function, 
Serializable {
                long currentWatermark();
 
                /**
-                * Returns the timestamp of the current input record.
-                */
-               long timestamp();
-
-               /**
-                * Checks whether this record has a timestamp.
-                *
-                * @return True if the record has a timestamp, false if not.
+                * Returns the timestamp of the current input record or {@code 
null} if the element does not
+                * have an assigned timestamp.
                 */
-               boolean hasTimestamp();
+               Long timestamp();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index f4b09af..667e130 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -91,19 +91,11 @@ public class StreamSink<IN> extends 
AbstractUdfStreamOperator<Object, SinkFuncti
                }
 
                @Override
-               public long timestamp() {
-                       if (!element.hasTimestamp()) {
-                               throw new IllegalStateException(
-                                       "Record has no timestamp. Is the time 
characteristic set to 'ProcessingTime', or " +
-                                                       "did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
-
+               public Long timestamp() {
+                       if (element.hasTimestamp()) {
+                               return element.getTimestamp();
                        }
-                       return element.getTimestamp();
-               }
-
-               public boolean hasTimestamp() {
-                       return element.hasTimestamp();
+                       return null;
                }
-
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
index 500a52a..9085ade 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
@@ -96,7 +96,8 @@ public class StreamSinkOperatorTest extends TestLogger {
                @Override
                public void invoke(
                        T value, Context context) throws Exception {
-                       if (context.hasTimestamp()) {
+                       Long timestamp = context.timestamp();
+                       if (timestamp != null) {
                                data.add(
                                        new Tuple4<>(
                                                context.currentWatermark(),

Reply via email to