[FLINK-8260] [kafka] Reorder deprecated / regular constructors of 
FlinkKafkaProducer010

This commit moves deprecated factory methods of the
FlinkKafkaProducer010 behind regular constructors, for better navigation
and readability of the code.

This closes #5179.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10f1acf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10f1acf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10f1acf9

Branch: refs/heads/release-1.4
Commit: 10f1acf92313cde7bf4ac8aa1403b19405d2ed25
Parents: 7197489
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Dec 18 20:29:38 2017 -0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Jan 5 22:03:40 2018 -0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java | 167 +++++++++----------
 1 file changed, 82 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10f1acf9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 21e3a10..0e64aa5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -46,80 +46,6 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         */
        private boolean writeTimestampToKafka = false;
 
-       // ---------------------- "Constructors" for timestamp writing 
------------------
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        * @param inStream The stream to write to Kafka
-        * @param topicId ID of the Kafka topic.
-        * @param serializationSchema User defined serialization schema 
supporting key/value messages
-        * @param producerConfig Properties with the producer configuration.
-        *
-        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
-        * and call {@link #setWriteTimestampToKafka(boolean)}.
-        */
-       @Deprecated
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        * @param inStream The stream to write to Kafka
-        * @param topicId ID of the Kafka topic.
-        * @param serializationSchema User defined (keyless) serialization 
schema.
-        * @param producerConfig Properties with the producer configuration.
-        *
-        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
-        * and call {@link #setWriteTimestampToKafka(boolean)}.
-        */
-       @Deprecated
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        SerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<T>());
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        *  @param inStream The stream to write to Kafka
-        *  @param topicId The name of the target topic
-        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
-        *
-        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
-        * and call {@link #setWriteTimestampToKafka(boolean)}.
-        */
-       @Deprecated
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig,
-                                                                               
                                                                                
        FlinkKafkaPartitioner<T> customPartitioner) {
-
-               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
-               DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(streamSink, 
inStream, kafkaProducer);
-
-       }
-
        // ---------------------- Regular constructors------------------
 
        /**
@@ -267,6 +193,18 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       // ------------------- User configuration ----------------------
+
+       /**
+        * If set to true, Flink will write the (event time) timestamp attached 
to each record into Kafka.
+        * Timestamps must be positive for Kafka to accept them.
+        *
+        * @param writeTimestampToKafka Flag indicating if Flink's internal 
timestamps are written to Kafka.
+        */
+       public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
+               this.writeTimestampToKafka = writeTimestampToKafka;
+       }
+
        // ----------------------------- Deprecated constructors / factory 
methods  ---------------------------
 
        /**
@@ -275,6 +213,76 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         *
         * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
         *
+        * @param inStream The stream to write to Kafka
+        * @param topicId ID of the Kafka topic.
+        * @param serializationSchema User defined serialization schema 
supporting key/value messages
+        * @param producerConfig Properties with the producer configuration.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
+        */
+       @Deprecated
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig) {
+               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        * @param inStream The stream to write to Kafka
+        * @param topicId ID of the Kafka topic.
+        * @param serializationSchema User defined (keyless) serialization 
schema.
+        * @param producerConfig Properties with the producer configuration.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
+        */
+       @Deprecated
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        SerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig) {
+               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<T>());
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        *  @param inStream The stream to write to Kafka
+        *  @param topicId The name of the target topic
+        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *
+        * @deprecated Use {@link #FlinkKafkaProducer010(String, 
KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
+        * and call {@link #setWriteTimestampToKafka(boolean)}.
+        */
+       @Deprecated
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig,
+                                                                               
                                                                                
        FlinkKafkaPartitioner<T> customPartitioner) {
+               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
+               DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(streamSink, 
inStream, kafkaProducer);
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * <p>This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
         *  @param inStream The stream to write to Kafka
         *  @param topicId The name of the target topic
         *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
@@ -332,17 +340,6 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
-       /**
-        * If set to true, Flink will write the (event time) timestamp attached 
to each record into Kafka.
-        * Timestamps must be positive for Kafka to accept them.
-        *
-        * @param writeTimestampToKafka Flag indicating if Flink's internal 
timestamps are written to Kafka.
-        */
-       public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
-               this.writeTimestampToKafka = writeTimestampToKafka;
-       }
-
-
        // ----------------------------- Generic element processing  
---------------------------
 
        @Override

Reply via email to