[hotfix] Don't use deprecated writeWithTimestamps in Kafka 0.10 tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a3621b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a3621b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a3621b8

Branch: refs/heads/master
Commit: 9a3621b842d2bf6b76e394f1412dd27475180ac2
Parents: 08bfdae
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Sep 28 14:53:24 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Oct 9 18:58:36 2017 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaTestEnvironmentImpl.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a3621b8/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 5a5caad..d0e935b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -168,7 +168,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
        @Override
        public <T> DataStreamSink<T> writeToKafkaWithTimestamps(DataStream<T> 
stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props) {
-               return FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, 
topic, serSchema, props);
+               FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props);
+               prod.setFlushOnCheckpoint(true);
+               prod.setWriteTimestampToKafka(true);
+               return stream.addSink(prod);
        }
 
        @Override

Reply via email to