AHeise commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426275573



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
##########
@@ -264,7 +264,7 @@ public FlinkKafkaConsumerBase(
         * @param properties - Kafka configuration properties to be adjusted
         * @param offsetCommitMode offset commit mode

Review comment:
       I was referring to "Kafka Shuffle Consumer Part " in commit message. 
Could by symmetric to producer part.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
##########
@@ -192,6 +176,29 @@ protected String getFetcherName() {
                return "Kafka Fetcher";
        }
 
+       protected void partitionConsumerRecordsHandler(
+                       List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+                       KafkaTopicPartitionState<TopicPartition> partition) 
throws Exception {
+
+               for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+                       deserializer.deserialize(record, kafkaCollector);
+
+                       // emit the actual records. this also updates offset 
state atomically and emits
+                       // watermarks
+                       emitRecordsWithTimestamps(
+                               kafkaCollector.getRecords(),
+                               partition,
+                               record.offset(),
+                               record.timestamp());
+
+                       if (kafkaCollector.isEndOfStreamSignalled()) {
+                               // end of stream signaled
+                               running = false;
+                               break;
+                       }

Review comment:
       I don't see the issue, automatic intellij extraction already helps. 
   ```
   while (running) {
   ...
                                        for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
                                                if (handleRecord(partition, 
record)) {
                                                     running = false;
                                                     break;
                                                }
                                        }
   }
   ```
   and
   ```
        protected boolean handleRecord(KafkaTopicPartitionState<TopicPartition> 
partition, ConsumerRecord<byte[], byte[]> record) throws Exception {
                
                deserializer.deserialize(record, kafkaCollector);
   
                // emit the actual records. this also updates offset state 
atomically and emits
                // watermarks
                emitRecordsWithTimestamps(
                                kafkaCollector.getRecords(),
                                partition,
                                record.offset(),
                                record.timestamp());
   
                return kafkaCollector.isEndOfStreamSignalled();
        }
   ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In 
the example, the job execution graph is
+ *                     decoupled to three regions: `KafkaShuffleProducer', 
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ *                     through `PERSISTENT DATA` as shown below. If any region 
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> 
KafkaShuffleConsumer -> ...
+ *                                                |
+ *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * <p>Persisting keyBy shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+        * {@link FlinkKafkaShuffleConsumer} together.
+        *
+        * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+        * is similar to {@link DataStream#keyBy(KeySelector)}. They use the 
same key group assignment function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on where the key goes.
+        * Here, `numberOfPartitions` equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts its watermark
+        * to ALL of the Kafka partitions to make sure watermark information is 
propagated correctly.
+        *
+        * <p>On the consumer side, each consumer task should read partitions 
equal to the key group indices
+        * it is assigned. `numberOfPartitions` is the maximum parallelism of 
the consumer. This version only
+        * supports numberOfPartitions = consumerParallelism.
+        * In the case of using {@link TimeCharacteristic#EventTime}, a 
consumer task is responsible to emit
+        * watermarks. Watermarks are read from the corresponding Kafka 
partitions. Notice that a consumer task only starts
+        * to emit a watermark after reading at least one watermark from each 
producer task to make sure watermarks
+        * are monotonically increasing. Hence a consumer task needs to know 
`producerParallelism` as well.
+        *
+        * @see FlinkKafkaShuffle#writeKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param keySelector                   Key selector to retrieve key 
from `dataStream'
+        * @param <T>                                   Type of the input data 
stream
+        * @param <K>                                   Type of key
+        */
+       public static <T, K> KeyedStream<T, K> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       KeySelector<T, K> keySelector) {
+               // KafkaProducer#propsToMap uses Properties purely as a HashMap 
without considering the default properties
+               // So we have to flatten the default property to first level 
elements.
+               Properties kafkaProperties = PropertiesUtil.flatten(properties);
+               kafkaProperties.setProperty(PRODUCER_PARALLELISM, 
String.valueOf(producerParallelism));
+               kafkaProperties.setProperty(PARTITION_NUMBER, 
String.valueOf(numberOfPartitions));
+
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+
+               writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+               return readKeyBy(topic, env, dataStream.getType(), 
kafkaProperties, keySelector);
+       }
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param fields                                Key positions from the 
input data stream
+        * @param <T>                                   Type of the input data 
stream
+        */
+       public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       int... fields) {
+               return persistentKeyBy(
+                       dataStream,
+                       topic,
+                       producerParallelism,
+                       numberOfPartitions,
+                       properties,
+                       keySelector(dataStream, fields));
+       }
+
+       /**
+        * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+        *
+        * <p>This function contains a {@link FlinkKafkaShuffleProducer} to 
shuffle and persist data in Kafka.
+        * {@link FlinkKafkaShuffleProducer} uses the same key group assignment 
function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on the key.
+        * Here, the number of partitions equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts each watermark
+        * to all of the Kafka partitions to make sure watermark information is 
propagated properly.
+        *
+        * <p>Attention: make sure kafkaProperties include
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link 
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of 
the producer.
+        * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of 
partitions.
+        * They are not necessarily the same and allowed to be set 
independently.
+        *
+        * @see FlinkKafkaShuffle#persistentKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy

Review comment:
       usually added at the very top of a javadoc

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In 
the example, the job execution graph is
+ *                     decoupled to three regions: `KafkaShuffleProducer', 
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ *                     through `PERSISTENT DATA` as shown below. If any region 
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> 
KafkaShuffleConsumer -> ...
+ *                                                |
+ *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * <p>Persisting keyBy shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+        * {@link FlinkKafkaShuffleConsumer} together.
+        *
+        * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+        * is similar to {@link DataStream#keyBy(KeySelector)}. They use the 
same key group assignment function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on where the key goes.
+        * Here, `numberOfPartitions` equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts its watermark
+        * to ALL of the Kafka partitions to make sure watermark information is 
propagated correctly.
+        *
+        * <p>On the consumer side, each consumer task should read partitions 
equal to the key group indices
+        * it is assigned. `numberOfPartitions` is the maximum parallelism of 
the consumer. This version only
+        * supports numberOfPartitions = consumerParallelism.
+        * In the case of using {@link TimeCharacteristic#EventTime}, a 
consumer task is responsible to emit
+        * watermarks. Watermarks are read from the corresponding Kafka 
partitions. Notice that a consumer task only starts
+        * to emit a watermark after reading at least one watermark from each 
producer task to make sure watermarks
+        * are monotonically increasing. Hence a consumer task needs to know 
`producerParallelism` as well.
+        *
+        * @see FlinkKafkaShuffle#writeKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param keySelector                   Key selector to retrieve key 
from `dataStream'
+        * @param <T>                                   Type of the input data 
stream
+        * @param <K>                                   Type of key
+        */
+       public static <T, K> KeyedStream<T, K> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       KeySelector<T, K> keySelector) {
+               // KafkaProducer#propsToMap uses Properties purely as a HashMap 
without considering the default properties
+               // So we have to flatten the default property to first level 
elements.
+               Properties kafkaProperties = PropertiesUtil.flatten(properties);
+               kafkaProperties.setProperty(PRODUCER_PARALLELISM, 
String.valueOf(producerParallelism));
+               kafkaProperties.setProperty(PARTITION_NUMBER, 
String.valueOf(numberOfPartitions));
+
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+
+               writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+               return readKeyBy(topic, env, dataStream.getType(), 
kafkaProperties, keySelector);
+       }
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.

Review comment:
       copy long explanation from above.

##########
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##########
@@ -19,6 +19,7 @@
 
 import org.slf4j.Logger;

Review comment:
       I don't think adding javadoc style formatting to commit message makes 
much sense. Plerase remove. Sorry for seeing it so late.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In 
the example, the job execution graph is
+ *                     decoupled to three regions: `KafkaShuffleProducer', 
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ *                     through `PERSISTENT DATA` as shown below. If any region 
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> 
KafkaShuffleConsumer -> ...
+ *                                                |
+ *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...

Review comment:
       Really good.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>

Review comment:
       Really good to have an example, but i'd replace the type tokens by some 
real types.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+       private final KafkaShuffleElementDeserializer<T> deserializer;
+
+       /** Serializer to serialize record. */
+       private final TypeSerializer<T> serializer;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+       private final Handover handover;
+
+       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+       private final KafkaConsumerThread consumerThread;
+
+       /** Flag to mark the main work loop as alive. */
+       private volatile boolean running = true;
+
+       public KafkaShuffleFetcher(
+               SourceFunction.SourceContext<T> sourceContext,
+               Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
+               SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+               SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+               ProcessingTimeService processingTimeProvider,
+               long autoWatermarkInterval,
+               ClassLoader userCodeClassLoader,
+               String taskNameWithSubtasks,
+               TypeSerializer<T> serializer,
+               Properties kafkaProperties,
+               long pollTimeout,
+               MetricGroup subtaskMetricGroup,
+               MetricGroup consumerMetricGroup,
+               boolean useMetrics,
+               int producerParallelism) throws Exception {
+               super(
+                       sourceContext,
+                       assignedPartitionsWithInitialOffsets,
+                       watermarksPeriodic,
+                       watermarksPunctuated,
+                       processingTimeProvider,
+                       autoWatermarkInterval,
+                       userCodeClassLoader,
+                       consumerMetricGroup,
+                       useMetrics);
+
+               this.deserializer = new KafkaShuffleElementDeserializer<>();
+               this.serializer = serializer;
+               this.handover = new Handover();
+               this.consumerThread = new KafkaConsumerThread(
+                       LOG,
+                       handover,
+                       kafkaProperties,
+                       unassignedPartitionsQueue,
+                       getFetcherName() + " for " + taskNameWithSubtasks,
+                       pollTimeout,
+                       useMetrics,
+                       consumerMetricGroup,
+                       subtaskMetricGroup);
+               this.watermarkHandler = new 
WatermarkHandler(producerParallelism);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               try {
+                       final Handover handover = this.handover;
+
+                       // kick off the actual Kafka consumer
+                       consumerThread.start();
+
+                       while (running) {
+                               // this blocks until we get the next records
+                               // it automatically re-throws exceptions 
encountered in the consumer thread
+                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitionStates()) {
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
+                                               
records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               final KafkaShuffleElement<T> 
element = deserializer.deserialize(serializer, record);
+
+                                               // TODO: do we need to check 
the end of stream if reaching the end watermark?
+
+                                               if (element.isRecord()) {
+                                                       // timestamp is 
inherent from upstream
+                                                       // If using 
ProcessTime, timestamp is going to be ignored (upstream does not include 
timestamp as well)
+                                                       // If using 
IngestionTime, timestamp is going to be overwritten
+                                                       // If using EventTime, 
timestamp is going to be used
+                                                       synchronized 
(checkpointLock) {
+                                                               
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+                                                               
sourceContext.collectWithTimestamp(
+                                                                       
elementAsRecord.value,
+                                                                       
elementAsRecord.timestamp == null ? record.timestamp() : 
elementAsRecord.timestamp);
+                                                               
partition.setOffset(record.offset());
+                                                       }
+                                               } else if 
(element.isWatermark()) {
+                                                       final 
KafkaShuffleWatermark watermark = element.asWatermark();
+                                                       Optional<Watermark> 
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+                                                       
newWatermark.ifPresent(sourceContext::emitWatermark);
+                                               }
+                                       }
+                               }
+                       }
+               }
+               finally {
+                       // this signals the consumer thread that no more work 
is to be done
+                       consumerThread.shutdown();
+               }
+
+               // on a clean exit, wait for the runner thread
+               try {
+                       consumerThread.join();
+               }
+               catch (InterruptedException e) {
+                       // may be the result of a wake-up interruption after an 
exception.
+                       // we ignore this here and only restore the 
interruption state
+                       Thread.currentThread().interrupt();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // flag the main thread to exit. A thread interrupt will come 
anyways.
+               running = false;
+               handover.close();
+               consumerThread.shutdown();
+       }
+
+       @Override
+       protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       @Override
+       protected void doCommitInternalOffsetsToKafka(
+               Map<KafkaTopicPartition, Long> offsets,
+               @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
+               @SuppressWarnings("unchecked")
+               List<KafkaTopicPartitionState<TopicPartition>> partitions = 
subscribedPartitionStates();
+
+               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.size());
+
+               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
+                       Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (lastProcessedOffset != null) {
+                               checkState(lastProcessedOffset >= 0, "Illegal 
offset value to commit");
+
+                               // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
+                               // This does not affect Flink's 
checkpoints/saved state.
+                               long offsetToCommit = lastProcessedOffset + 1;
+
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit));
+                               partition.setCommittedOffset(offsetToCommit);
+                       }
+               }
+
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               consumerThread.setOffsetsToCommit(offsetsToCommit, 
commitCallback);
+       }
+
+       private String getFetcherName() {
+               return "Kafka Shuffle Fetcher";
+       }
+
+       private abstract static class KafkaShuffleElement<T> {
+
+               public boolean isRecord() {
+                       return getClass() == KafkaShuffleRecord.class;
+               }
+
+               boolean isWatermark() {
+                       return getClass() == KafkaShuffleWatermark.class;
+               }
+
+               KafkaShuffleRecord<T> asRecord() {
+                       return (KafkaShuffleRecord<T>) this;
+               }
+
+               KafkaShuffleWatermark asWatermark() {
+                       return (KafkaShuffleWatermark) this;
+               }
+       }
+
+       private static class KafkaShuffleWatermark<T> extends 
KafkaShuffleElement<T> {
+               final int subtask;
+               final long watermark;
+
+               KafkaShuffleWatermark(int subtask, long watermark) {
+                       this.subtask = subtask;
+                       this.watermark = watermark;
+               }
+       }
+
+       private static class KafkaShuffleRecord<T> extends 
KafkaShuffleElement<T> {
+               final T value;
+               final Long timestamp;
+
+               KafkaShuffleRecord(T value) {
+                       this.value = value;
+                       this.timestamp = null;
+               }
+
+               KafkaShuffleRecord(long timestamp, T value) {
+                       this.value = value;
+                       this.timestamp = timestamp;
+               }
+       }
+
+       private static class KafkaShuffleElementDeserializer<T> implements 
Serializable {
+               private transient DataInputDeserializer dis;
+
+               KafkaShuffleElementDeserializer() {
+                       this.dis = new DataInputDeserializer();
+               }
+
+               KafkaShuffleElement<T> deserialize(TypeSerializer<T> 
serializer, ConsumerRecord<byte[], byte[]> record)
+                       throws Exception {
+                       byte[] value = record.value();
+                       dis.setBuffer(value);
+                       int tag = IntSerializer.INSTANCE.deserialize(dis);
+
+                       if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                               return new 
KafkaShuffleRecord<>(serializer.deserialize(dis));
+                       } else if (tag == TAG_REC_WITH_TIMESTAMP) {
+                               return new 
KafkaShuffleRecord<>(LongSerializer.INSTANCE.deserialize(dis), 
serializer.deserialize(dis));
+                       } else if (tag == TAG_WATERMARK) {
+                               return new KafkaShuffleWatermark<>(
+                                       
IntSerializer.INSTANCE.deserialize(dis), 
LongSerializer.INSTANCE.deserialize(dis));
+                       }
+
+                       throw new UnsupportedOperationException("Unsupported 
tag format");
+               }
+       }
+
+       /**
+        * WatermarkHandler to generate watermarks.
+        */
+       private static class WatermarkHandler {
+               private final int producerParallelism;
+               private final Map<Integer, Long> subtaskWatermark;
+
+               private long currentMinWatermark = Long.MIN_VALUE;
+
+               WatermarkHandler(int numberOfSubtask) {
+                       this.producerParallelism = numberOfSubtask;
+                       this.subtaskWatermark = new HashMap<>(numberOfSubtask);
+               }
+
+               public Optional<Watermark> 
checkAndGetNewWatermark(KafkaShuffleWatermark newWatermark) {
+                       // watermarks is incremental for the same partition and 
PRODUCER subtask
+                       Long currentSubTaskWatermark = 
subtaskWatermark.get(newWatermark.subtask);
+
+                       Preconditions.checkState(
+                               (currentSubTaskWatermark == null) || 
(currentSubTaskWatermark <= newWatermark.watermark),
+                               "Watermark should always increase");
+
+                       subtaskWatermark.put(newWatermark.subtask, 
newWatermark.watermark);
+
+                       if (subtaskWatermark.values().size() < 
producerParallelism) {
+                               return Optional.empty();
+                       }

Review comment:
       Clarified offline, not needed as MAX watermark is propagated. If we want 
to support bounded sources all the way, we'd need to handle 
`EndOfPartitionEvents` but we can do that in a later version. `KafkaShuffle` 
will probably almost never be used with bounded sources.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.

Review comment:
       can be reused **in an independent job**
   
   (Flink does support streaming the datastream into multiple downstream 
operators already)

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In 
the example, the job execution graph is
+ *                     decoupled to three regions: `KafkaShuffleProducer', 
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ *                     through `PERSISTENT DATA` as shown below. If any region 
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> 
KafkaShuffleConsumer -> ...
+ *                                                |
+ *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * <p>Persisting keyBy shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+        * {@link FlinkKafkaShuffleConsumer} together.
+        *
+        * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+        * is similar to {@link DataStream#keyBy(KeySelector)}. They use the 
same key group assignment function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on where the key goes.
+        * Here, `numberOfPartitions` equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts its watermark
+        * to ALL of the Kafka partitions to make sure watermark information is 
propagated correctly.
+        *
+        * <p>On the consumer side, each consumer task should read partitions 
equal to the key group indices
+        * it is assigned. `numberOfPartitions` is the maximum parallelism of 
the consumer. This version only
+        * supports numberOfPartitions = consumerParallelism.
+        * In the case of using {@link TimeCharacteristic#EventTime}, a 
consumer task is responsible to emit
+        * watermarks. Watermarks are read from the corresponding Kafka 
partitions. Notice that a consumer task only starts
+        * to emit a watermark after reading at least one watermark from each 
producer task to make sure watermarks
+        * are monotonically increasing. Hence a consumer task needs to know 
`producerParallelism` as well.
+        *
+        * @see FlinkKafkaShuffle#writeKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param keySelector                   Key selector to retrieve key 
from `dataStream'
+        * @param <T>                                   Type of the input data 
stream
+        * @param <K>                                   Type of key
+        */
+       public static <T, K> KeyedStream<T, K> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       KeySelector<T, K> keySelector) {
+               // KafkaProducer#propsToMap uses Properties purely as a HashMap 
without considering the default properties
+               // So we have to flatten the default property to first level 
elements.
+               Properties kafkaProperties = PropertiesUtil.flatten(properties);
+               kafkaProperties.setProperty(PRODUCER_PARALLELISM, 
String.valueOf(producerParallelism));
+               kafkaProperties.setProperty(PARTITION_NUMBER, 
String.valueOf(numberOfPartitions));
+
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+
+               writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+               return readKeyBy(topic, env, dataStream.getType(), 
kafkaProperties, keySelector);
+       }
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param fields                                Key positions from the 
input data stream
+        * @param <T>                                   Type of the input data 
stream
+        */
+       public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       int... fields) {
+               return persistentKeyBy(
+                       dataStream,
+                       topic,
+                       producerParallelism,
+                       numberOfPartitions,
+                       properties,
+                       keySelector(dataStream, fields));
+       }
+
+       /**
+        * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+        *
+        * <p>This function contains a {@link FlinkKafkaShuffleProducer} to 
shuffle and persist data in Kafka.
+        * {@link FlinkKafkaShuffleProducer} uses the same key group assignment 
function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on the key.
+        * Here, the number of partitions equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts each watermark
+        * to all of the Kafka partitions to make sure watermark information is 
propagated properly.
+        *
+        * <p>Attention: make sure kafkaProperties include
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link 
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of 
the producer.
+        * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of 
partitions.
+        * They are not necessarily the same and allowed to be set 
independently.

Review comment:
       very good

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+
+/**
+ * Base Test Class for KafkaShuffle.
+ */
+public class KafkaShuffleTestBase extends KafkaConsumerTestBase {
+       static final long INIT_TIMESTAMP = System.currentTimeMillis();

Review comment:
       As I have said, if I want to compare two shuffle topics created by two 
commits to track down some bug, the changed timestamp will actually be a big 
PITA.
   Tests don't need to be realistic in the sense of data, but should be 
realistic in terms of workload (or queries).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and 
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ *     - you would like to reuse the shuffle data and/or,
+ *     - you would like to avoid a full restart of a pipeline during failure 
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link 
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ *     StreamExecutionEnvironment env = ...                                    
// create execution environment
+ *     DataStream<X> source = env.addSource(...)                               
// add data stream source
+ *     DataStream<Y> dataStream = ...                                          
        // some transformation(s) based on source
+ *
+ *     KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ *             .persistentKeyBy(                                               
                        // keyBy shuffle through kafka
+ *                     dataStream,                                             
                                // data stream to be shuffled
+ *                     topic,                                                  
                                // Kafka topic written to
+ *                     producerParallelism,                                    
                // the number of tasks of a Kafka Producer
+ *                     numberOfPartitions,                                     
                        // the number of partitions of the Kafka topic written 
to
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Producer and Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key from `dataStream'
+ *
+ *     keyedStream.transform...                                                
                // some other transformation(s)
+ *
+ *     KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ *             .readKeyBy(                                                     
                                // Read the Kafka shuffle data again for other 
usages
+ *                     topic,                                                  
                                // the topic of Kafka where data is persisted
+ *                     env,                                                    
                                // execution environment, and it can be a new 
environment
+ *                     typeInformation<Y>,                                     
                        // type information of the data persisted in Kafka
+ *                     kafkaProperties,                                        
                        // kafka properties for Kafka Consumer
+ *                     keySelector<Y, KEY>);                                   
                // key selector to retrieve key
+ *
+ *     keyedStreamReuse.transform...                                           
        // some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link 
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}. 
{@link FlinkKafkaShuffleProducer} decides
+ *                     which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link 
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In 
the example, the job execution graph is
+ *                     decoupled to three regions: `KafkaShuffleProducer', 
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ *                     through `PERSISTENT DATA` as shown below. If any region 
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ *     source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> 
KafkaShuffleConsumer -> ...
+ *                                                |
+ *                                                | ----------> 
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * <p>Persisting keyBy shuffle is achieved by wrapping a {@link 
FlinkKafkaShuffleProducer} and
+        * {@link FlinkKafkaShuffleConsumer} together.
+        *
+        * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+        * is similar to {@link DataStream#keyBy(KeySelector)}. They use the 
same key group assignment function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on where the key goes.
+        * Here, `numberOfPartitions` equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts its watermark
+        * to ALL of the Kafka partitions to make sure watermark information is 
propagated correctly.
+        *
+        * <p>On the consumer side, each consumer task should read partitions 
equal to the key group indices
+        * it is assigned. `numberOfPartitions` is the maximum parallelism of 
the consumer. This version only
+        * supports numberOfPartitions = consumerParallelism.
+        * In the case of using {@link TimeCharacteristic#EventTime}, a 
consumer task is responsible to emit
+        * watermarks. Watermarks are read from the corresponding Kafka 
partitions. Notice that a consumer task only starts
+        * to emit a watermark after reading at least one watermark from each 
producer task to make sure watermarks
+        * are monotonically increasing. Hence a consumer task needs to know 
`producerParallelism` as well.
+        *
+        * @see FlinkKafkaShuffle#writeKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param keySelector                   Key selector to retrieve key 
from `dataStream'
+        * @param <T>                                   Type of the input data 
stream
+        * @param <K>                                   Type of key
+        */
+       public static <T, K> KeyedStream<T, K> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       KeySelector<T, K> keySelector) {
+               // KafkaProducer#propsToMap uses Properties purely as a HashMap 
without considering the default properties
+               // So we have to flatten the default property to first level 
elements.
+               Properties kafkaProperties = PropertiesUtil.flatten(properties);
+               kafkaProperties.setProperty(PRODUCER_PARALLELISM, 
String.valueOf(producerParallelism));
+               kafkaProperties.setProperty(PARTITION_NUMBER, 
String.valueOf(numberOfPartitions));
+
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+
+               writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+               return readKeyBy(topic, env, dataStream.getType(), 
kafkaProperties, keySelector);
+       }
+
+       /**
+        * Uses Kafka as a message bus to persist keyBy shuffle.
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param producerParallelism   Parallelism of producer
+        * @param numberOfPartitions    Number of partitions
+        * @param properties                    Kafka properties
+        * @param fields                                Key positions from the 
input data stream
+        * @param <T>                                   Type of the input data 
stream
+        */
+       public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       int... fields) {
+               return persistentKeyBy(
+                       dataStream,
+                       topic,
+                       producerParallelism,
+                       numberOfPartitions,
+                       properties,
+                       keySelector(dataStream, fields));
+       }
+
+       /**
+        * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+        *
+        * <p>This function contains a {@link FlinkKafkaShuffleProducer} to 
shuffle and persist data in Kafka.
+        * {@link FlinkKafkaShuffleProducer} uses the same key group assignment 
function
+        * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to 
decide which partition a key goes.
+        * Hence, each producer task can potentially write to each Kafka 
partition based on the key.
+        * Here, the number of partitions equals to the key group size.
+        * In the case of using {@link TimeCharacteristic#EventTime}, each 
producer task broadcasts each watermark
+        * to all of the Kafka partitions to make sure watermark information is 
propagated properly.
+        *
+        * <p>Attention: make sure kafkaProperties include
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link 
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+        * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of 
the producer.
+        * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of 
partitions.
+        * They are not necessarily the same and allowed to be set 
independently.
+        *
+        * @see FlinkKafkaShuffle#persistentKeyBy
+        * @see FlinkKafkaShuffle#readKeyBy
+        *
+        * @param dataStream                    Data stream to be shuffled
+        * @param topic                                 Kafka topic written to
+        * @param kafkaProperties               Kafka properties for Kafka 
Producer
+        * @param keySelector                   Key selector to retrieve key 
from `dataStream'
+        * @param <T>                                   Type of the input data 
stream
+        * @param <K>                                   Type of key
+        */
+       public static <T, K> void writeKeyBy(
+                       DataStream<T> dataStream,
+                       String topic,
+                       Properties kafkaProperties,
+                       KeySelector<T, K> keySelector) {
+
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+               TypeSerializer<T> typeSerializer = 
dataStream.getType().createSerializer(env.getConfig());
+
+               // write data to Kafka
+               FlinkKafkaShuffleProducer<T, K> kafkaProducer = new 
FlinkKafkaShuffleProducer<>(
+                       topic,
+                       typeSerializer,
+                       kafkaProperties,
+                       env.clean(keySelector),
+                       FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
+                       FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+               // make sure the sink parallelism is set to producerParallelism
+               Preconditions.checkArgument(
+                       kafkaProperties.getProperty(PRODUCER_PARALLELISM) != 
null,
+                       "Missing producer parallelism for Kafka Shuffle");
+               int producerParallelism = 
PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE);
+
+               addKafkaShuffle(dataStream, kafkaProducer, producerParallelism);
+       }
+
+       /**
+        * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.

Review comment:
       copy javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to