[FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify 
partitioning behaviour


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71974895
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71974895
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71974895

Branch: refs/heads/release-1.4
Commit: 71974895da966478f2e24fd36c08d7cf386a7050
Parents: c454ee3
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon Dec 18 20:21:20 2017 -0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Jan 5 22:03:20 2018 -0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    | 46 +++++++----
 .../connectors/kafka/FlinkKafkaProducer010.java | 83 ++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer011.java | 87 +++++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer08.java  | 82 +++++++++++++++---
 .../connectors/kafka/FlinkKafkaProducer09.java  | 83 ++++++++++++++++---
 5 files changed, 324 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index c6461f9..e2df5fd 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -462,11 +462,8 @@ FlinkKafkaProducer011<String> myProducer = new 
FlinkKafkaProducer011<String>(
         "my-topic",                  // target topic
         new SimpleStringSchema());   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false);   // "false" by default
-myProducer.setFlushOnCheckpoint(true);  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true);
 
 stream.addSink(myProducer);
@@ -481,11 +478,8 @@ val myProducer = new FlinkKafkaProducer011[String](
         "my-topic",               // target topic
         new SimpleStringSchema)   // serialization schema
 
-// the following is necessary for at-least-once delivery guarantee
-myProducer.setLogFailuresOnly(false)   // "false" by default
-myProducer.setFlushOnCheckpoint(true)  // "false" by default
-
-// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka
+// versions 0.10+ allow attaching the records' event timestamp when writing 
them to Kafka;
+// this method is not available for earlier Kafka versions
 myProducer.setWriteTimestampToKafka(true)
 
 stream.addSink(myProducer)
@@ -505,11 +499,30 @@ are other constructor variants that allow providing the 
following:
  partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to 
the
  constructor. This partitioner will be called for each record in the stream
  to determine which exact partition of the target topic the record should be 
sent to.
+ Please see [Kafka Producer Partitioning 
Scheme](#kafka-producer-partitioning-scheme) for more details.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called 
`KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to 
override the target topic,
  so that one producer instance can send data to multiple topics.
  
+### Kafka Producer Partitioning Scheme
+ 
+By default, if a custom partitioner is not specified for the Flink Kafka 
Producer, the producer will use
+a `FlinkFixedPartitioner` that maps each Flink Kafka Producer parallel subtask 
to a single Kafka partition
+(i.e., all records received by a sink subtask will end up in the same Kafka 
partition).
+
+A custom partitioner can be implemented by extending the 
`FlinkKafkaPartitioner` class. All
+Kafka versions' constructors allow providing a custom partitioner when 
instantiating the producer.
+Note that the partitioner implementation must be serializable, as they will be 
transferred across Flink nodes.
+Also, keep in mind that any state in the partitioner will be lost on job 
failures since the partitioner
+is not part of the producer's checkpointed state.
+
+It is also possible to completely avoid using and kind of partitioner, and 
simply let Kafka partition
+the written records by their attached key (as determined for each record using 
the provided serialization schema).
+To do this, provide a `null` custom partitioner when instantiating the 
producer. It is important
+to provide `null` as the custom partitioner; as explained above, if a custom 
partitioner is not specified
+the `FlinkFixedPartitioner` is used instead.
+
 ### Kafka Producers and Fault Tolerance
 
 #### Kafka 0.8
@@ -522,17 +535,22 @@ With Flink's checkpointing enabled, the 
`FlinkKafkaProducer09` and `FlinkKafkaPr
 can provide at-least-once delivery guarantees.
 
 Besides enabling Flink's checkpointing, you should also configure the setter
-methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` 
appropriately,
-as shown in the above examples in the previous section:
+methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` 
appropriately.
 
- * `setLogFailuresOnly(boolean)`: enabling this will let the producer log 
failures only
+ * `setLogFailuresOnly(boolean)`: by default, this is set to `false`.
+ Enabling this will let the producer only log failures
  instead of catching and rethrowing them. This essentially accounts the record
  to have succeeded, even if it was never written to the target Kafka topic. 
This
  must be disabled for at-least-once.
- * `setFlushOnCheckpoint(boolean)`: with this enabled, Flink's checkpoints 
will wait for any
+ * `setFlushOnCheckpoint(boolean)`: by default, this is set to `false`.
+ With this enabled, Flink's checkpoints will wait for any
  on-the-fly records at the time of the checkpoint to be acknowledged by Kafka 
before
  succeeding the checkpoint. This ensures that all records before the 
checkpoint have
  been written to Kafka. This must be enabled for at-least-once.
+ 
+In conclusion, to configure the Kafka producer to have at-least-once 
guarantees for versions
+0.9 and 0.10, `setLogFailureOnly` must be set to `false` and 
`setFlushOnCheckpoint` must be set
+to `true`.
 
 **Note**: By default, the number of retries is set to "0". This means that 
when `setLogFailuresOnly` is set to `false`,
 the producer fails immediately on errors, including leader changes. The value 
is set to "0" by default to avoid

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 184a2e7..21e3a10 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -124,12 +126,20 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         */
        public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<T> serializationSchema) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
@@ -139,10 +149,18 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
@@ -151,15 +169,26 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+        * attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+        * partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param serializationSchema A key-less serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
distributed to Kafka partitions
+        *                          in a round-robin fashion.
         */
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> 
customPartitioner) {
+       public FlinkKafkaProducer010(
+                       String topicId,
+                       SerializationSchema<T> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
        }
 
@@ -169,6 +198,14 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
@@ -184,6 +221,14 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
@@ -196,11 +241,29 @@ public class FlinkKafkaProducer010<T> extends 
FlinkKafkaProducer09<T> {
        }
 
        /**
-        * Create Kafka producer.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
         *
-        * <p>This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
         */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<T> customPartitioner) {
+       public FlinkKafkaProducer010(
+                       String topicId,
+                       KeyedSerializationSchema<T> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<T> customPartitioner) {
+
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index b14e487..58355c9 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -277,13 +277,21 @@ public class FlinkKafkaProducer011<IN>
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer011(String, SerializationSchema, 
Properties, Optional)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
@@ -296,15 +304,26 @@ public class FlinkKafkaProducer011<IN>
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+        * attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+        * partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param serializationSchema A key-less serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If a partitioner is not provided, records 
will be distributed to Kafka partitions
+        *                          in a round-robin fashion.
         */
-       public FlinkKafkaProducer011(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, 
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+       public FlinkKafkaProducer011(
+                       String topicId,
+                       SerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
+
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
        }
 
@@ -314,6 +333,14 @@ public class FlinkKafkaProducer011<IN>
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, 
Properties, Optional)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
@@ -333,6 +360,14 @@ public class FlinkKafkaProducer011<IN>
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, 
Properties, Optional)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
@@ -352,12 +387,22 @@ public class FlinkKafkaProducer011<IN>
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer011(String, KeyedSerializationSchema, 
Properties, Optional, Semantic, int)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
         *                      User defined serialization schema supporting 
key/value messages
         * @param producerConfig
         *                      Properties with the producer configuration.
+        * @param semantic
+        *                      Defines semantic that will be used by this 
producer (see {@link Semantic}).
         */
        public FlinkKafkaProducer011(
                        String topicId,
@@ -374,12 +419,22 @@ public class FlinkKafkaProducer011<IN>
 
 
        /**
-        * The main constructor for creating a FlinkKafkaProducer.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
         *
         * @param defaultTopicId The default topic to write data to
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If a partitioner is not provided, records 
will be partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
         */
        public FlinkKafkaProducer011(
                        String defaultTopicId,
@@ -396,12 +451,22 @@ public class FlinkKafkaProducer011<IN>
        }
 
        /**
-        * The main constructor for creating a FlinkKafkaProducer.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
         *
         * @param defaultTopicId The default topic to write data to
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If a partitioner is not provided, records 
will be partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
         * @param semantic Defines semantic that will be used by this producer 
(see {@link Semantic}).
         * @param kafkaProducersPoolSize Overwrite default KafkaProducers pool 
size (see {@link Semantic#EXACTLY_ONCE}).
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index d2f17d2..42fb892 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,17 +40,26 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
 
        private static final long serialVersionUID = 1L;
 
-       // ------------------- Keyless serialization schema constructors 
----------------------
+       // ------------------- Key-less serialization schema constructors 
----------------------
+
        /**
         * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer08(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         */
        public FlinkKafkaProducer08(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
@@ -58,10 +69,18 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer08(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
@@ -70,14 +89,26 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
        }
 
        /**
-        * The main constructor for creating a FlinkKafkaProducer.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+        * attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+        * partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param serializationSchema A key-less serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
distributed to Kafka partitions
+        *                          in a round-robin fashion.
         */
-       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
+       public FlinkKafkaProducer08(
+                       String topicId,
+                       SerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
 
        }
@@ -88,6 +119,14 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
@@ -103,6 +142,14 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer08(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
@@ -115,14 +162,29 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
        }
 
        /**
-        * The main constructor for creating a FlinkKafkaProducer.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
         */
-       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
+       public FlinkKafkaProducer08(
+                       String topicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71974895/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 407bad5..19f445f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
+import javax.annotation.Nullable;
+
 import java.util.Properties;
 
 /**
@@ -38,31 +40,47 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
 
        private static final long serialVersionUID = 1L;
 
-       // ------------------- Keyless serialization schema constructors 
----------------------
+       // ------------------- Key-less serialization schema constructors 
----------------------
 
        /**
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer09(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         */
        public FlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer09(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
-        *                      User defined (keyless) serialization schema.
+        *                      User defined key-less serialization schema.
         * @param producerConfig
         *                      Properties with the producer configuration.
         */
@@ -71,15 +89,26 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a key-less {@link SerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>Since a key-less {@link SerializationSchema} is used, all records 
sent to Kafka will not have an
+        * attached key. Therefore, if a partitioner is also not provided, 
records will be distributed to Kafka
+        * partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param serializationSchema A key-less serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
distributed to Kafka partitions
+        *                          in a round-robin fashion.
         */
-       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
+       public FlinkKafkaProducer09(
+                       String topicId,
+                       SerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
 
        }
@@ -90,6 +119,14 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param brokerList
         *                      Comma separated addresses of the brokers
         * @param topicId
@@ -105,6 +142,14 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
         *
+        * <p>Using this constructor, the default {@link FlinkFixedPartitioner} 
will be used as
+        * the partitioner. This default partitioner maps each sink subtask to 
a single Kafka
+        * partition (i.e. all records received by a sink subtask will end up 
in the same
+        * Kafka partition).
+        *
+        * <p>To use a custom partitioner, please use
+        * {@link #FlinkKafkaProducer09(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        *
         * @param topicId
         *                      ID of the Kafka topic.
         * @param serializationSchema
@@ -117,15 +162,29 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
        }
 
        /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces 
its input to
+        * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
possibly a custom {@link FlinkKafkaPartitioner}.
+        *
+        * <p>If a partitioner is not provided, written records will be 
partitioned by the attached key of each
+        * record (as determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If written records do not
+        * have a key (i.e., {@link 
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
+        * will be distributed to Kafka partitions in a round-robin fashion.
         *
         * @param topicId The topic to write data to
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *                          If set to {@code null}, records will be 
partitioned by the key of each record
+        *                          (determined by {@link 
KeyedSerializationSchema#serializeKey(Object)}). If the keys
+        *                          are {@code null}, then records will be 
distributed to Kafka partitions in a
+        *                          round-robin fashion.
         */
-       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
+       public FlinkKafkaProducer09(
+                       String topicId,
+                       KeyedSerializationSchema<IN> serializationSchema,
+                       Properties producerConfig,
+                       @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
+
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 

Reply via email to