Repository: flink Updated Branches: refs/heads/release-1.3 168378d98 -> 6714f4a39
[FLINK-7764] [kafka] Enable the operator settings for FlinkKafkaProducer010 [hotfix] [kafka] Fix the config parameter names in KafkaTestBase. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6714f4a3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6714f4a3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6714f4a3 Branch: refs/heads/release-1.3 Commit: 6714f4a3917bc1fd29529c34115011c71f0686e3 Parents: 168378d Author: Xingcan Cui <[email protected]> Authored: Wed Oct 11 23:42:29 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Nov 2 16:29:12 2017 +0100 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer010.java | 63 ++++++++++++++++++-- 1 file changed, 57 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6714f4a3/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 7909ba6..1019c09 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -31,7 +31,10 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.api.transformations.SinkTransformation; +import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; @@ -135,8 +138,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + SingleOutputStreamOperator<Object> streamOperator = inStream.transform + ("FlinkKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(streamOperator, kafkaProducer); } // ---------------------- Regular constructors w/o timestamp support ------------------ @@ -255,8 +259,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + SingleOutputStreamOperator<Object> streamOperator = inStream.transform + ("FlinkKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(streamOperator, kafkaProducer); } /** @@ -440,12 +445,14 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct private final FlinkKafkaProducerBase wrappedProducerBase; private final FlinkKafkaProducer010 producer; + private final StreamTransformation transformation; private FlinkKafkaProducer010Configuration(DataStream stream, FlinkKafkaProducer010<T> producer) { //noinspection unchecked super(stream, producer); - this.producer = producer; this.wrappedProducerBase = (FlinkKafkaProducerBase) producer.userFunction; + this.producer = producer; + this.transformation = stream.getTransformation(); } /** @@ -480,7 +487,51 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { this.producer.writeTimestampToKafka = writeTimestampToKafka; } - } + // ************************************************************************* + // Override methods to use the transformation in this class. + // ************************************************************************* + + @Override + public SinkTransformation<T> getTransformation() { + throw new UnsupportedOperationException("The SinkTransformation is not accessible " + + "from " + this.getClass().getSimpleName()); + } + + @Override + public DataStreamSink<T> name(String name) { + transformation.setName(name); + return this; + } + + @Override + public DataStreamSink<T> uid(String uid) { + transformation.setUid(uid); + return this; + } + @Override + public DataStreamSink<T> setUidHash(String uidHash) { + transformation.setUidHash(uidHash); + return this; + } + + @Override + public DataStreamSink<T> setParallelism(int parallelism) { + transformation.setParallelism(parallelism); + return this; + } + + @Override + public DataStreamSink<T> disableChaining() { + this.transformation.setChainingStrategy(ChainingStrategy.NEVER); + return this; + } + + @Override + public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) { + transformation.setSlotSharingGroup(slotSharingGroup); + return this; + } + } }
