[FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010
This commit moves deprecated factory methods of the FlinkKafkaProducer010 behind regular constructors, for better navigation and readability of the code. This closes #5179. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f1acf9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f1acf9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f1acf9 Branch: refs/heads/release-1.4 Commit: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25 Parents: 7197489 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon Dec 18 20:29:38 2017 -0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Jan 5 22:03:40 2018 -0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++---------- 1 file changed, 82 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10f1acf9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 21e3a10..0e64aa5 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { */ private boolean writeTimestampToKafka = false; - // ---------------------- "Constructors" for timestamp writing ------------------ - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined serialization schema supporting key/value messages - * @param producerConfig Properties with the producer configuration. - * - * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} - * and call {@link #setWriteTimestampToKafka(boolean)}. - */ - @Deprecated - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - KeyedSerializationSchema<T> serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId ID of the Kafka topic. - * @param serializationSchema User defined (keyless) serialization schema. - * @param producerConfig Properties with the producer configuration. - * - * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} - * and call {@link #setWriteTimestampToKafka(boolean)}. - */ - @Deprecated - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - SerializationSchema<T> serializationSchema, - Properties producerConfig) { - return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>()); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId The name of the target topic - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - * - * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} - * and call {@link #setWriteTimestampToKafka(boolean)}. - */ - @Deprecated - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - KeyedSerializationSchema<T> serializationSchema, - Properties producerConfig, - FlinkKafkaPartitioner<T> customPartitioner) { - - FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer); - - } - // ---------------------- Regular constructors------------------ /** @@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { super(topicId, serializationSchema, producerConfig, customPartitioner); } + // ------------------- User configuration ---------------------- + + /** + * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. + * Timestamps must be positive for Kafka to accept them. + * + * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. + */ + public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { + this.writeTimestampToKafka = writeTimestampToKafka; + } + // ----------------------------- Deprecated constructors / factory methods --------------------------- /** @@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { * * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined serialization schema supporting key/value messages + * @param producerConfig Properties with the producer configuration. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} + * and call {@link #setWriteTimestampToKafka(boolean)}. + */ + @Deprecated + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId ID of the Kafka topic. + * @param serializationSchema User defined (keyless) serialization schema. + * @param producerConfig Properties with the producer configuration. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)} + * and call {@link #setWriteTimestampToKafka(boolean)}. + */ + @Deprecated + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + SerializationSchema<T> serializationSchema, + Properties producerConfig) { + return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>()); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * + * @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} + * and call {@link #setWriteTimestampToKafka(boolean)}. + */ + @Deprecated + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig, + FlinkKafkaPartitioner<T> customPartitioner) { + FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); + DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * * @param inStream The stream to write to Kafka * @param topicId The name of the target topic * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages @@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> { super(topicId, serializationSchema, producerConfig, customPartitioner); } - /** - * If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. - * Timestamps must be positive for Kafka to accept them. - * - * @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka. - */ - public void setWriteTimestampToKafka(boolean writeTimestampToKafka) { - this.writeTimestampToKafka = writeTimestampToKafka; - } - - // ----------------------------- Generic element processing --------------------------- @Override
