[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6886f638 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6886f638 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6886f638 Branch: refs/heads/master Commit: 6886f638d70419e01ebdfc1cdbb6d834f3fb30b0 Parents: e7996b0 Author: Aljoscha Krettek <[email protected]> Authored: Tue Aug 29 15:53:16 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Sep 21 13:46:39 2017 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer010.java | 226 +++++-------------- .../kafka/FlinkKafkaProducerBase.java | 2 +- .../connectors/kafka/KafkaProducerTestBase.java | 3 + .../api/functions/sink/SinkContextUtil.java | 7 +- .../api/functions/sink/SinkFunction.java | 26 +-- .../streaming/api/operators/StreamSink.java | 16 +- .../api/operators/StreamSinkOperatorTest.java | 3 +- 7 files changed, 81 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 3b9dff1..3b43a7e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -17,24 +17,13 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.IterationRuntimeContext; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -43,32 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic; -import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; - /** * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x - * - * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a) - * and a custom operator (b). - * - * <p>For (a), the class implements the SinkFunction and RichFunction interfaces. - * For (b), it extends the StreamTask class. - * - * <p>Details about approach (a): - * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the - * DataStream.addSink() method. - * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record - * the Kafka 0.10 producer has a second invocation option, approach (b). - * - * <p>Details about approach (b): - * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the - * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer - * can access the internal record timestamp of the record and write it to Kafka. - * - * <p>All methods and constructors in this class are marked with the approach they are needed for. */ -public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction, CheckpointedFunction { +public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { /** * Flag controlling whether we are writing the Flink record's timestamp into Kafka. @@ -87,7 +54,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param topicId ID of the Kafka topic. * @param serializationSchema User defined serialization schema supporting key/value messages * @param producerConfig Properties with the producer configuration. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} + * and call {@link #setWriteTimestampToKafka(boolean)}. */ + @Deprecated public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, KeyedSerializationSchema<T> serializationSchema, @@ -105,7 +76,11 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param topicId ID of the Kafka topic. * @param serializationSchema User defined (keyless) serialization schema. * @param producerConfig Properties with the producer configuration. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} + * and call {@link #setWriteTimestampToKafka(boolean)}. */ + @Deprecated public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, SerializationSchema<T> serializationSchema, @@ -124,20 +99,24 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} + * and call {@link #setWriteTimestampToKafka(boolean)}. */ + @Deprecated public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { - GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer); + } - // ---------------------- Regular constructors w/o timestamp support ------------------ + // ---------------------- Regular constructors------------------ /** * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to @@ -220,9 +199,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) */ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { - // We create a Kafka 09 producer instance here and only "override" (by intercepting) the - // invoke call. - super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); + super(topicId, serializationSchema, producerConfig, customPartitioner); } // ----------------------------- Deprecated constructors / factory methods --------------------------- @@ -250,11 +227,10 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct Properties producerConfig, KafkaPartitioner<T> customPartitioner) { - GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer); + return new FlinkKafkaProducer010Configuration<T>(streamSink, inStream, kafkaProducer); } /** @@ -288,157 +264,75 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { // We create a Kafka 09 producer instance here and only "override" (by intercepting) the // invoke call. - super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner))); + super(topicId, serializationSchema, producerConfig, customPartitioner); } - // ----------------------------- Generic element processing --------------------------- + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.writeTimestampToKafka = writeTimestampToKafka; + } - private void invokeInternal(T next, long elementTimestamp) throws Exception { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; + // ----------------------------- Generic element processing --------------------------- - internalProducer.checkErroneous(); + @Override + public void invoke(T value, Context context) throws Exception { - byte[] serializedKey = internalProducer.schema.serializeKey(next); - byte[] serializedValue = internalProducer.schema.serializeValue(next); - String targetTopic = internalProducer.schema.getTargetTopic(next); + checkErroneous(); + + byte[] serializedKey = schema.serializeKey(value); + byte[] serializedValue = schema.serializeValue(value); + String targetTopic = schema.getTargetTopic(value); if (targetTopic == null) { - targetTopic = internalProducer.defaultTopicId; + targetTopic = defaultTopicId; } Long timestamp = null; if (this.writeTimestampToKafka) { - timestamp = elementTimestamp; + timestamp = context.timestamp(); } ProducerRecord<byte[], byte[]> record; - int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic); + int[] partitions = topicPartitionsMap.get(targetTopic); if (null == partitions) { - partitions = getPartitionsByTopic(targetTopic, internalProducer.producer); - internalProducer.topicPartitionsMap.put(targetTopic, partitions); + partitions = getPartitionsByTopic(targetTopic, producer); + topicPartitionsMap.put(targetTopic, partitions); } - if (internalProducer.flinkKafkaPartitioner == null) { + if (flinkKafkaPartitioner == null) { record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); } else { - record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); + record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(value, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); } - if (internalProducer.flushOnCheckpoint) { - synchronized (internalProducer.pendingRecordsLock) { - internalProducer.pendingRecords++; + if (flushOnCheckpoint) { + synchronized (pendingRecordsLock) { + pendingRecords++; } } - internalProducer.producer.send(record, internalProducer.callback); - } - - // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- - - // ---- Configuration setters - - /** - * Defines whether the producer should fail on errors, or only log them. - * If this is set to true, then exceptions will be only logged, if set to false, - * exceptions will be eventually thrown and cause the streaming program to - * fail (and enter recovery). - * - * <p>Method is only accessible for approach (a) (see above) - * - * @param logFailuresOnly The flag to indicate logging-only on exceptions. - */ - public void setLogFailuresOnly(boolean logFailuresOnly) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setLogFailuresOnly(logFailuresOnly); - } - - /** - * If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers - * to be acknowledged by the Kafka producer on a checkpoint. - * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. - * - * <p>Method is only accessible for approach (a) (see above) - * - * @param flush Flag indicating the flushing mode (true = flush on checkpoint) - */ - public void setFlushOnCheckpoint(boolean flush) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setFlushOnCheckpoint(flush); - } - - /** - * This method is used for approach (a) (see above). - */ - @Override - public void open(Configuration parameters) throws Exception { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.open(parameters); - } - - /** - * This method is used for approach (a) (see above). - */ - @Override - public IterationRuntimeContext getIterationRuntimeContext() { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - return internalProducer.getIterationRuntimeContext(); - } - - /** - * This method is used for approach (a) (see above). - */ - @Override - public void setRuntimeContext(RuntimeContext t) { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.setRuntimeContext(t); - } - - /** - * Invoke method for using the Sink as DataStream.addSink() sink. - * - * <p>This method is used for approach (a) (see above) - * - * @param value The input record. - */ - @Override - public void invoke(T value) throws Exception { - invokeInternal(value, Long.MAX_VALUE); - } - - // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- - - /** - * Process method for using the sink with timestamp support. - * - * <p>This method is used for approach (b) (see above) - */ - @Override - public void processElement(StreamRecord<T> element) throws Exception { - invokeInternal(element.getValue(), element.getTimestamp()); - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.initializeState(context); - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - final FlinkKafkaProducerBase<T> internalProducer = (FlinkKafkaProducerBase<T>) userFunction; - internalProducer.snapshotState(context); + producer.send(record, callback); } /** * Configuration object returned by the writeToKafkaWithTimestamps() call. + * + * <p>This is only kept because it's part of the public API. It is not necessary anymore, now + * that the {@link SinkFunction} interface provides timestamps. */ public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> { - private final FlinkKafkaProducerBase wrappedProducerBase; private final FlinkKafkaProducer010 producer; - private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) { + private FlinkKafkaProducer010Configuration( + DataStreamSink originalSink, + DataStream<T> inputStream, + FlinkKafkaProducer010<T> producer) { //noinspection unchecked - super(stream, producer); + super(inputStream, originalSink.getTransformation().getOperator()); this.producer = producer; - this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; } /** @@ -450,7 +344,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param logFailuresOnly The flag to indicate logging-only on exceptions. */ public void setLogFailuresOnly(boolean logFailuresOnly) { - this.wrappedProducerBase.setLogFailuresOnly(logFailuresOnly); + producer.setLogFailuresOnly(logFailuresOnly); } /** @@ -461,7 +355,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param flush Flag indicating the flushing mode (true = flush on checkpoint) */ public void setFlushOnCheckpoint(boolean flush) { - this.wrappedProducerBase.setFlushOnCheckpoint(flush); + producer.setFlushOnCheckpoint(flush); } /** @@ -471,7 +365,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. */ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { - this.producer.writeTimestampToKafka = writeTimestampToKafka; + producer.writeTimestampToKafka = writeTimestampToKafka; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index 76a2f84..befc1a1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -276,7 +276,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im * The incoming data */ @Override - public void invoke(IN next) throws Exception { + public void invoke(IN next, Context context) throws Exception { // propagate asynchronous errors checkErroneous(); http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 000de52..35607dd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -215,6 +215,9 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { * This test sets KafkaProducer so that it will not automatically flush the data and * simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer * flushed records manually on snapshotState. + * + * <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The + * parameter controls which method is used. */ protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java index 2749560..3b02ad0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java @@ -43,14 +43,9 @@ public class SinkContextUtil { } @Override - public long timestamp() { + public Long timestamp() { return timestamp; } - - @Override - public boolean hasTimestamp() { - return true; - } }; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java index 15a77c4..74870bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java @@ -31,22 +31,22 @@ import java.io.Serializable; public interface SinkFunction<IN> extends Function, Serializable { /** - * Function for standard sink behaviour. This function is called for every record. - * - * @param value The input record. - * @throws Exception * @deprecated Use {@link #invoke(Object, Context)}. */ @Deprecated - default void invoke(IN value) throws Exception { - } + default void invoke(IN value) throws Exception {} /** * Writes the given value to the sink. This function is called for every record. * + * <p>You have to override this method when implementing a {@code SinkFunction}, this is a + * {@code default} method for backward compatibility with the old-style method only. + * * @param value The input record. * @param context Additional context about the input record. - * @throws Exception + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. */ default void invoke(IN value, Context context) throws Exception { invoke(value); @@ -72,15 +72,9 @@ public interface SinkFunction<IN> extends Function, Serializable { long currentWatermark(); /** - * Returns the timestamp of the current input record. - */ - long timestamp(); - - /** - * Checks whether this record has a timestamp. - * - * @return True if the record has a timestamp, false if not. + * Returns the timestamp of the current input record or {@code null} if the element does not + * have an assigned timestamp. */ - boolean hasTimestamp(); + Long timestamp(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java index f4b09af..667e130 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java @@ -91,19 +91,11 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti } @Override - public long timestamp() { - if (!element.hasTimestamp()) { - throw new IllegalStateException( - "Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " + - "did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); - + public Long timestamp() { + if (element.hasTimestamp()) { + return element.getTimestamp(); } - return element.getTimestamp(); - } - - public boolean hasTimestamp() { - return element.hasTimestamp(); + return null; } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6886f638/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java index 500a52a..9085ade 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java @@ -96,7 +96,8 @@ public class StreamSinkOperatorTest extends TestLogger { @Override public void invoke( T value, Context context) throws Exception { - if (context.hasTimestamp()) { + Long timestamp = context.timestamp(); + if (timestamp != null) { data.add( new Tuple4<>( context.currentWatermark(),
