AHeise commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425298217



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -210,7 +210,7 @@
        /**
         * The name of the default topic this producer is writing data to.
         */
-       private final String defaultTopicId;
+       protected final String defaultTopicId;

Review comment:
       Would package-private suffice? (Same for all similar changes in this and 
next commit).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -210,7 +210,7 @@
        /**

Review comment:
       Commit message should include a bit more detail, so that someone who 
doesn't know FLINK-15670 by heart knows what's going on.
   For example 
   ```
   [FLINK-15670][connector] Adds the producer for KafkaShuffle.
   
   KafkaShuffle provides a transparent Kafka source and sink pair, through 
which the network traffic of one shuffle step is redirected.
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##########
@@ -108,6 +109,27 @@ public static boolean getBoolean(Properties config, String 
key, boolean defaultV
                }
        }
 
+       /**
+        * Flatten a recursive {@link Properties} to a first level property map.
+        * In some cases, {KafkaProducer#propsToMap} for example, Properties is 
used purely as a HashTable

Review comment:
       its not a proper link, but I guess it's also not possible because of the 
redirection. Maybe just use` {@code KafkaProducer#propsToMap}`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
+       private final KafkaSerializer<IN> kafkaSerializer;
+       private final KeySelector<IN, KEY> keySelector;
+       private final int numberOfPartitions;
+
+       FlinkKafkaShuffleProducer(
+                       String defaultTopicId,
+                       TypeInformationSerializationSchema<IN> schema,
+                       Properties props,
+                       KeySelector<IN, KEY> keySelector,
+                       Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+               this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+               this.keySelector = keySelector;
+
+               Preconditions.checkArgument(
+                       props.getProperty(PARTITION_NUMBER) != null,
+                       "Missing partition number for Kafka Shuffle");
+               numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+       }
+
+       /**
+        * This is the function invoked to handle each element.
+        * @param transaction transaction state;
+        *                    elements are written to Kafka in transactions to 
guarantee different level of data consistency
+        * @param next element to handle
+        * @param context context needed to handle the element
+        * @throws FlinkKafkaException for kafka error
+        */
+       @Override
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+               checkErroneous();
+
+               // write timestamp to Kafka if timestamp is available
+               Long timestamp = context.timestamp();
+
+               int[] partitions = getPartitions(transaction);
+               int partitionIndex;
+               try {
+                       partitionIndex = KeyGroupRangeAssignment
+                               
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to assign a partition 
number to record");

Review comment:
       Please use exception chaining:
   `throw new RuntimeException("Fail to assign a partition number to record", 
e)`
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
+       private final KafkaSerializer<IN> kafkaSerializer;
+       private final KeySelector<IN, KEY> keySelector;
+       private final int numberOfPartitions;
+
+       FlinkKafkaShuffleProducer(
+                       String defaultTopicId,
+                       TypeInformationSerializationSchema<IN> schema,
+                       Properties props,
+                       KeySelector<IN, KEY> keySelector,
+                       Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+               this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+               this.keySelector = keySelector;
+
+               Preconditions.checkArgument(
+                       props.getProperty(PARTITION_NUMBER) != null,
+                       "Missing partition number for Kafka Shuffle");
+               numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+       }
+
+       /**
+        * This is the function invoked to handle each element.
+        * @param transaction transaction state;
+        *                    elements are written to Kafka in transactions to 
guarantee different level of data consistency
+        * @param next element to handle
+        * @param context context needed to handle the element
+        * @throws FlinkKafkaException for kafka error
+        */
+       @Override
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+               checkErroneous();
+
+               // write timestamp to Kafka if timestamp is available
+               Long timestamp = context.timestamp();
+
+               int[] partitions = getPartitions(transaction);
+               int partitionIndex;
+               try {
+                       partitionIndex = KeyGroupRangeAssignment
+                               
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to assign a partition 
number to record");
+               }
+
+               ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                       defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+               pendingRecords.incrementAndGet();
+               transaction.getProducer().send(record, callback);
+       }
+
+       /**
+        * This is the function invoked to handle each watermark.
+        * @param watermark watermark to handle
+        * @throws FlinkKafkaException for kafka error
+        */
+       public void invoke(Watermark watermark) throws FlinkKafkaException {
+               checkErroneous();
+               KafkaTransactionState transaction = currentTransaction();
+
+               int[] partitions = getPartitions(transaction);
+               int subtask = getRuntimeContext().getIndexOfThisSubtask();
+
+               // broadcast watermark
+               long timestamp = watermark.getTimestamp();
+               for (int partition : partitions) {
+                       ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
+                               defaultTopicId, partition, timestamp, null, 
kafkaSerializer.serializeWatermark(watermark, subtask));
+                       pendingRecords.incrementAndGet();
+                       transaction.getProducer().send(record, callback);
+               }
+       }
+
+       private int[] getPartitions(KafkaTransactionState transaction) {
+               int[] partitions = topicPartitionsMap.get(defaultTopicId);
+               if (partitions == null) {
+                       partitions = getPartitionsByTopic(defaultTopicId, 
transaction.getProducer());
+                       topicPartitionsMap.put(defaultTopicId, partitions);
+               }
+
+               Preconditions.checkArgument(partitions.length == 
numberOfPartitions);
+
+               return partitions;
+       }
+
+       /**
+        * Flink Kafka Shuffle Serializer.
+        */
+       public static final class KafkaSerializer<IN> implements Serializable {
+               public static final int TAG_REC_WITH_TIMESTAMP = 0;
+               public static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+               public static final int TAG_WATERMARK = 2;
+
+               private final TypeSerializer<IN> serializer;
+
+               private transient DataOutputSerializer dos;
+
+               KafkaSerializer(TypeSerializer<IN> serializer) {
+                       this.serializer = serializer;
+               }
+
+               /**
+                * Format: TAG, (timestamp), record.
+                */
+               byte[] serializeRecord(IN record, Long timestamp) {
+                       if (dos == null) {
+                               dos = new DataOutputSerializer(16);
+                       }
+
+                       try {
+                               if (timestamp == null) {
+                                       dos.writeInt(TAG_REC_WITHOUT_TIMESTAMP);

Review comment:
       Would `writeByte` suffice?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism.
+        *
+        * @param inputStream input stream to the kafka

Review comment:
       Description of parameters is usually capitalized to make it easier read 
(also in code).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{

Review comment:
       A see huge duplicates to `KafkaFetcher`. I'm curios why you didn't 
choose to subclass in the same way as on producer/consumer.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+       private final KafkaShuffleElementDeserializer<T> deserializer;
+
+       /** Serializer to serialize record. */
+       private final TypeSerializer<T> serializer;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+       private final Handover handover;
+
+       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+       private final KafkaConsumerThread consumerThread;
+
+       /** Flag to mark the main work loop as alive. */
+       private volatile boolean running = true;
+
+       public KafkaShuffleFetcher(
+                       SourceFunction.SourceContext<T> sourceContext,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       String taskNameWithSubtasks,
+                       TypeSerializer<T> serializer,
+                       Properties kafkaProperties,
+                       long pollTimeout,
+                       MetricGroup subtaskMetricGroup,
+                       MetricGroup consumerMetricGroup,
+                       boolean useMetrics,
+                       int producerParallelism) throws Exception {
+               super(
+                       sourceContext,
+                       assignedPartitionsWithInitialOffsets,
+                       watermarksPeriodic,
+                       watermarksPunctuated,
+                       processingTimeProvider,
+                       autoWatermarkInterval,
+                       userCodeClassLoader,
+                       consumerMetricGroup,
+                       useMetrics);
+               this.deserializer = new KafkaShuffleElementDeserializer<>();
+               this.serializer = serializer;
+               this.handover = new Handover();
+               this.consumerThread = new KafkaConsumerThread(
+                       LOG,
+                       handover,
+                       kafkaProperties,
+                       unassignedPartitionsQueue,
+                       getFetcherName() + " for " + taskNameWithSubtasks,
+                       pollTimeout,
+                       useMetrics,
+                       consumerMetricGroup,
+                       subtaskMetricGroup);
+               this.watermarkHandler = new 
WatermarkHandler(producerParallelism);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               try {
+                       final Handover handover = this.handover;
+
+                       // kick off the actual Kafka consumer
+                       consumerThread.start();
+
+                       while (running) {
+                               // this blocks until we get the next records
+                               // it automatically re-throws exceptions 
encountered in the consumer thread
+                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitionStates()) {
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
+                                               
records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               final KafkaShuffleElement<T> 
element = deserializer.deserialize(serializer, record);
+
+                                               // TODO: do we need to check 
the end of stream if reaching the end watermark?
+
+                                               if (element.isRecord()) {
+                                                       // timestamp is 
inherent from upstream
+                                                       // If using 
ProcessTime, timestamp is going to be ignored (upstream does not include 
timestamp as well)
+                                                       // If using 
IngestionTime, timestamp is going to be overwritten
+                                                       // If using EventTime, 
timestamp is going to be used
+                                                       synchronized 
(checkpointLock) {
+                                                               
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+                                                               
sourceContext.collectWithTimestamp(
+                                                                       
elementAsRecord.value,
+                                                                       
elementAsRecord.timestamp == null ? record.timestamp() : 
elementAsRecord.timestamp);
+                                                               
partition.setOffset(record.offset());
+                                                       }
+                                               } else if 
(element.isWatermark()) {
+                                                       final 
KafkaShuffleWatermark watermark = element.asWatermark();
+                                                       Optional<Watermark> 
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+                                                       
newWatermark.ifPresent(sourceContext::emitWatermark);
+                                               }
+                                       }
+                               }
+                       }
+               }
+               finally {
+                       // this signals the consumer thread that no more work 
is to be done
+                       consumerThread.shutdown();
+               }
+
+               // on a clean exit, wait for the runner thread
+               try {
+                       consumerThread.join();
+               }
+               catch (InterruptedException e) {
+                       // may be the result of a wake-up interruption after an 
exception.
+                       // we ignore this here and only restore the 
interruption state
+                       Thread.currentThread().interrupt();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // flag the main thread to exit. A thread interrupt will come 
anyways.
+               running = false;
+               handover.close();
+               consumerThread.shutdown();
+       }
+
+       @Override
+       protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       @Override
+       protected void doCommitInternalOffsetsToKafka(
+                       Map<KafkaTopicPartition, Long> offsets,
+                       @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
+               @SuppressWarnings("unchecked")
+               List<KafkaTopicPartitionState<TopicPartition>> partitions = 
subscribedPartitionStates();
+
+               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.size());
+
+               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
+                       Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (lastProcessedOffset != null) {
+                               checkState(lastProcessedOffset >= 0, "Illegal 
offset value to commit");
+
+                               // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
+                               // This does not affect Flink's 
checkpoints/saved state.
+                               long offsetToCommit = lastProcessedOffset + 1;
+
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit));
+                               partition.setCommittedOffset(offsetToCommit);
+                       }
+               }
+
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               consumerThread.setOffsetsToCommit(offsetsToCommit, 
commitCallback);
+       }
+
+       private String getFetcherName() {
+               return "Kafka Shuffle Fetcher";
+       }
+
+       /**
+        * An element in a KafkaShuffle. Can be a record or a Watermark.
+        */
+       @VisibleForTesting
+       public abstract static class KafkaShuffleElement<T> {
+
+               public boolean isRecord() {
+                       return getClass() == KafkaShuffleRecord.class;
+               }
+
+               public boolean isWatermark() {
+                       return getClass() == KafkaShuffleWatermark.class;
+               }
+
+               public KafkaShuffleRecord<T> asRecord() {
+                       return (KafkaShuffleRecord<T>) this;
+               }
+
+               public KafkaShuffleWatermark asWatermark() {
+                       return (KafkaShuffleWatermark) this;
+               }
+       }
+
+       /**
+        * A watermark element in a KafkaShuffle. It includes
+        * - subtask index where the watermark is coming from
+        * - watermark timestamp
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleWatermark<T> extends 
KafkaShuffleElement<T> {
+               final int subtask;
+               final long watermark;
+
+               KafkaShuffleWatermark(int subtask, long watermark) {
+                       this.subtask = subtask;
+                       this.watermark = watermark;
+               }
+
+               public int getSubtask() {
+                       return subtask;
+               }
+
+               public long getWatermark() {
+                       return watermark;
+               }
+       }
+
+       /**
+        * One value with Type T in a KafkaShuffle. This stores the value and 
an optional associated timestamp.
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleRecord<T> extends 
KafkaShuffleElement<T> {
+               final T value;
+               final Long timestamp;
+
+               KafkaShuffleRecord(T value) {
+                       this.value = value;
+                       this.timestamp = null;
+               }
+
+               KafkaShuffleRecord(long timestamp, T value) {
+                       this.value = value;
+                       this.timestamp = timestamp;
+               }
+
+               public T getValue() {
+                       return value;
+               }
+
+               public Long getTimestamp() {
+                       return timestamp;
+               }
+       }
+
+       /**
+        * Deserializer for KafkaShuffleElement.
+        * @param <T>
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleElementDeserializer<T> implements 
Serializable {
+               private transient DataInputDeserializer dis;
+
+               @VisibleForTesting
+               public KafkaShuffleElementDeserializer() {
+                       this.dis = new DataInputDeserializer();
+               }
+
+               @VisibleForTesting
+               public KafkaShuffleElement<T> deserialize(TypeSerializer<T> 
serializer, ConsumerRecord<byte[], byte[]> record)
+                       throws Exception {
+                       byte[] value = record.value();
+                       dis.setBuffer(value);

Review comment:
       `dis` is not initialized when `KafkaShuffleElementDeserializer` gets 
deserialized.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
+       private final KafkaSerializer<IN> kafkaSerializer;
+       private final KeySelector<IN, KEY> keySelector;
+       private final int numberOfPartitions;
+
+       FlinkKafkaShuffleProducer(
+                       String defaultTopicId,
+                       TypeInformationSerializationSchema<IN> schema,
+                       Properties props,
+                       KeySelector<IN, KEY> keySelector,
+                       Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+               this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+               this.keySelector = keySelector;
+
+               Preconditions.checkArgument(
+                       props.getProperty(PARTITION_NUMBER) != null,
+                       "Missing partition number for Kafka Shuffle");
+               numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+       }
+
+       /**
+        * This is the function invoked to handle each element.
+        * @param transaction transaction state;
+        *                    elements are written to Kafka in transactions to 
guarantee different level of data consistency
+        * @param next element to handle
+        * @param context context needed to handle the element
+        * @throws FlinkKafkaException for kafka error
+        */
+       @Override
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+               checkErroneous();
+
+               // write timestamp to Kafka if timestamp is available
+               Long timestamp = context.timestamp();
+
+               int[] partitions = getPartitions(transaction);
+               int partitionIndex;
+               try {
+                       partitionIndex = KeyGroupRangeAssignment
+                               
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to assign a partition 
number to record");
+               }
+
+               ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                       defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));

Review comment:
       If method parameters do not fit on the line of invocation all of them 
need to be chopped (including the first one).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
+       private final KafkaSerializer<IN> kafkaSerializer;
+       private final KeySelector<IN, KEY> keySelector;
+       private final int numberOfPartitions;
+
+       FlinkKafkaShuffleProducer(
+                       String defaultTopicId,
+                       TypeInformationSerializationSchema<IN> schema,
+                       Properties props,
+                       KeySelector<IN, KEY> keySelector,
+                       Semantic semantic,
+                       int kafkaProducersPoolSize) {
+               super(defaultTopicId, (element, timestamp) -> null, props, 
semantic, kafkaProducersPoolSize);
+
+               this.kafkaSerializer = new 
KafkaSerializer<>(schema.getSerializer());
+               this.keySelector = keySelector;
+
+               Preconditions.checkArgument(
+                       props.getProperty(PARTITION_NUMBER) != null,
+                       "Missing partition number for Kafka Shuffle");
+               numberOfPartitions = PropertiesUtil.getInt(props, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+       }
+
+       /**
+        * This is the function invoked to handle each element.
+        * @param transaction transaction state;
+        *                    elements are written to Kafka in transactions to 
guarantee different level of data consistency
+        * @param next element to handle
+        * @param context context needed to handle the element
+        * @throws FlinkKafkaException for kafka error
+        */
+       @Override
+       public void invoke(KafkaTransactionState transaction, IN next, Context 
context) throws FlinkKafkaException {
+               checkErroneous();
+
+               // write timestamp to Kafka if timestamp is available
+               Long timestamp = context.timestamp();
+
+               int[] partitions = getPartitions(transaction);
+               int partitionIndex;
+               try {
+                       partitionIndex = KeyGroupRangeAssignment
+                               
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length, 
partitions.length);
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to assign a partition 
number to record");
+               }
+
+               ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                       defaultTopicId, partitionIndex, timestamp, null, 
kafkaSerializer.serializeRecord(next, timestamp));
+               pendingRecords.incrementAndGet();
+               transaction.getProducer().send(record, callback);
+       }
+
+       /**
+        * This is the function invoked to handle each watermark.
+        * @param watermark watermark to handle
+        * @throws FlinkKafkaException for kafka error
+        */
+       public void invoke(Watermark watermark) throws FlinkKafkaException {
+               checkErroneous();
+               KafkaTransactionState transaction = currentTransaction();
+
+               int[] partitions = getPartitions(transaction);
+               int subtask = getRuntimeContext().getIndexOfThisSubtask();
+
+               // broadcast watermark
+               long timestamp = watermark.getTimestamp();
+               for (int partition : partitions) {
+                       ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
+                               defaultTopicId, partition, timestamp, null, 
kafkaSerializer.serializeWatermark(watermark, subtask));

Review comment:
       chop

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -73,7 +73,7 @@
 
        /** The lock that guarantees that record emission and state updates are 
atomic,
         * from the view of taking a checkpoint. */
-       private final Object checkpointLock;
+       protected final Object checkpointLock;

Review comment:
       package-private?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling 
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> 
{
+       private final KafkaSerializer<IN> kafkaSerializer;
+       private final KeySelector<IN, KEY> keySelector;
+       private final int numberOfPartitions;
+
+       FlinkKafkaShuffleProducer(
+                       String defaultTopicId,
+                       TypeInformationSerializationSchema<IN> schema,

Review comment:
       Pass serializer directly.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
##########
@@ -126,4 +126,8 @@ public boolean isEndOfStream(T nextElement) {
        public TypeInformation<T> getProducedType() {
                return typeInfo;
        }
+
+       public TypeSerializer<T> getSerializer() {

Review comment:
       Unneeded API change (see `KafkaShuffleProducer`).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;

Review comment:
       comment? just to be symmetric with the other fields (not strictly 
needed).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;
+       // 
------------------------------------------------------------------------

Review comment:
       this line does not make any sense.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.

Review comment:
       Usually, after first sentence (or summary if it's more than one 
sentence), we have linebreak and start next paragraph with `<p>`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.

Review comment:
       Method javadocs usually start in the third form `Writes to ...` see also 
`DataStream`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+       private final KafkaShuffleElementDeserializer<T> deserializer;
+
+       /** Serializer to serialize record. */
+       private final TypeSerializer<T> serializer;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+       private final Handover handover;
+
+       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+       private final KafkaConsumerThread consumerThread;
+
+       /** Flag to mark the main work loop as alive. */
+       private volatile boolean running = true;
+
+       public KafkaShuffleFetcher(
+                       SourceFunction.SourceContext<T> sourceContext,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       String taskNameWithSubtasks,
+                       TypeSerializer<T> serializer,
+                       Properties kafkaProperties,
+                       long pollTimeout,
+                       MetricGroup subtaskMetricGroup,
+                       MetricGroup consumerMetricGroup,
+                       boolean useMetrics,
+                       int producerParallelism) throws Exception {
+               super(
+                       sourceContext,
+                       assignedPartitionsWithInitialOffsets,
+                       watermarksPeriodic,
+                       watermarksPunctuated,
+                       processingTimeProvider,
+                       autoWatermarkInterval,
+                       userCodeClassLoader,
+                       consumerMetricGroup,
+                       useMetrics);
+               this.deserializer = new KafkaShuffleElementDeserializer<>();
+               this.serializer = serializer;
+               this.handover = new Handover();
+               this.consumerThread = new KafkaConsumerThread(
+                       LOG,
+                       handover,
+                       kafkaProperties,
+                       unassignedPartitionsQueue,
+                       getFetcherName() + " for " + taskNameWithSubtasks,
+                       pollTimeout,
+                       useMetrics,
+                       consumerMetricGroup,
+                       subtaskMetricGroup);
+               this.watermarkHandler = new 
WatermarkHandler(producerParallelism);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               try {
+                       final Handover handover = this.handover;
+
+                       // kick off the actual Kafka consumer
+                       consumerThread.start();
+
+                       while (running) {
+                               // this blocks until we get the next records
+                               // it automatically re-throws exceptions 
encountered in the consumer thread
+                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitionStates()) {
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
+                                               
records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               final KafkaShuffleElement<T> 
element = deserializer.deserialize(serializer, record);
+
+                                               // TODO: do we need to check 
the end of stream if reaching the end watermark?
+
+                                               if (element.isRecord()) {
+                                                       // timestamp is 
inherent from upstream
+                                                       // If using 
ProcessTime, timestamp is going to be ignored (upstream does not include 
timestamp as well)
+                                                       // If using 
IngestionTime, timestamp is going to be overwritten
+                                                       // If using EventTime, 
timestamp is going to be used
+                                                       synchronized 
(checkpointLock) {
+                                                               
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+                                                               
sourceContext.collectWithTimestamp(
+                                                                       
elementAsRecord.value,
+                                                                       
elementAsRecord.timestamp == null ? record.timestamp() : 
elementAsRecord.timestamp);
+                                                               
partition.setOffset(record.offset());
+                                                       }
+                                               } else if 
(element.isWatermark()) {
+                                                       final 
KafkaShuffleWatermark watermark = element.asWatermark();
+                                                       Optional<Watermark> 
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+                                                       
newWatermark.ifPresent(sourceContext::emitWatermark);
+                                               }

Review comment:
       This is the only difference to `KafkaFetcher` right? That could be 
overridden in an extracted `handleRecord` or so.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> {
+       private final TypeSerializer<T> serializer;
+       private final int producerParallelism;
+
+       FlinkKafkaShuffleConsumer(String topic, 
TypeInformationSerializationSchema<T> schema, Properties props) {

Review comment:
       Same as in producer. Pass serializer directly.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism.
+        *
+        * @param inputStream input stream to the kafka
+        * @param topic kafka topic
+        * @param producerParallelism parallelism of producer
+        * @param numberOfPartitions number of partitions
+        * @param properties Kafka properties
+        * @param fields key positions from inputStream
+        * @param <T> input type
+        */
+       public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+                       DataStream<T> inputStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       int... fields) {
+               return persistentKeyBy(
+                       inputStream, topic, producerParallelism, 
numberOfPartitions, properties, keySelector(inputStream, fields));

Review comment:
       chop

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A customized {@link StreamOperator} for executing {@link 
FlinkKafkaShuffleProducer} that handle
+ * both elements and watermarks. If the shuffle sink is determined to be 
useful to other sinks in the future,
+ * we should abstract this operator to data stream api. For now, we keep the 
operator this way to avoid
+ * public interface change.
+ */
+@Internal
+class StreamKafkaShuffleSink<IN> extends StreamSink<IN> {
+
+       public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer 
flinkKafkaShuffleProducer) {
+               super(flinkKafkaShuffleProducer);
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+               this.currentWatermark = mark.getTimestamp();

Review comment:
       this is already done by `super`. Afaik you then also don't need the 
change in StreamSink.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleBase {
+
+       @Rule
+       public final Timeout timeout = Timeout.millis(600000L);
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with the default 
time characteristic: ProcessingTime
+        */
+
+       @Test
+       public void testSimpleProcessingTime() throws Exception {
+               testKafkaShuffle("test_simple", 200000, ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with time 
characteristic: IngestionTime
+        */
+       @Test
+       public void testSimpleIngestionTime() throws Exception {
+               testKafkaShuffle("test_simple", 200000, IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with time 
characteristic: EventTime
+        */
+       @Test
+       public void testSimpleEventTime() throws Exception {
+               testKafkaShuffle("test_simple", 100000, EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: ProcessingTime
+        */
+       @Test
+       public void testAssignedToPartitionProcessingTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 300000, 
ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: IngestionTime
+        */
+       @Test
+       public void testAssignedToPartitionIngestionTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 300000, 
IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: EventTime
+        */
+       @Test
+       public void testAssignedToPartitionEventTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 100000, 
EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value serialization and deserialization with time 
characteristic: ProcessingTime
+        */
+       @Test
+       public void testSerDeProcessingTime() throws Exception {
+               testRecordSerDe(ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: IngestionTime
+        */
+       @Test
+       public void testSerDeIngestionTime() throws Exception {
+               testRecordSerDe(IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: EventTime
+        */
+       @Test
+       public void testSerDeEventTime() throws Exception {
+               testRecordSerDe(EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: EventTime
+        */
+       @Test
+       public void testWatermarkBroadcasting() throws Exception {
+               final int numberOfPartitions = 3;
+               final int producerParallelism = 2;
+               final int numElementsPerProducer = 1000;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> 
results = testKafkaShuffleProducer(
+                       topic("test_watermark_broadcast", EventTime),
+                       env,
+                       numberOfPartitions,
+                       producerParallelism,
+                       numElementsPerProducer,
+                       EventTime);
+               TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = 
createTypeSerializer(env);
+               KafkaShuffleElementDeserializer<Tuple3<Integer, Long, Integer>> 
deserializer = new KafkaShuffleElementDeserializer<>();
+
+               // Records in a single partition are kept in order
+               for (int p = 0; p < numberOfPartitions; p++) {
+                       Collection<ConsumerRecord<byte[], byte[]>> records = 
results.get(p);
+                       Map<Integer, List<KafkaShuffleWatermark>> watermarks = 
new HashMap<>();
+
+                       for (ConsumerRecord<byte[], byte[]> consumerRecord : 
records) {
+                               Assert.assertNull(consumerRecord.key());
+                               KafkaShuffleElement<Tuple3<Integer, Long, 
Integer>> element =
+                                       
deserializer.deserialize(typeSerializer, consumerRecord);
+                               if (element.isRecord()) {
+                                       KafkaShuffleRecord<Tuple3<Integer, 
Long, Integer>> record = element.asRecord();
+                                       
Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + 
record.getValue().f0);
+                                       
Assert.assertEquals(record.getTimestamp().longValue(), 
record.getValue().f1.longValue());
+                               } else if (element.isWatermark()) {
+                                       KafkaShuffleWatermark watermark = 
element.asWatermark();
+                                       
watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+                                       
watermarks.get(watermark.getSubtask()).add(watermark);
+                               } else {
+                                       fail("KafkaShuffleElement is either 
record or watermark");
+                               }
+                       }
+
+                       // According to the setting how watermarks are 
generated in this ITTest,
+                       // every producer task emits a watermark corresponding 
to each record + the end-of-event-time watermark.
+                       // Hence each producer sub task generates 
`numElementsPerProducer + 1` watermarks.
+                       // Each producer sub task broadcasts these 
`numElementsPerProducer + 1` watermarks to all partitions.
+                       // Thus in total, each producer sub task emits 
`(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+                       // From the consumer side, each partition receives 
`(numElementsPerProducer + 1) * producerParallelism` watermarks,
+                       // with each producer sub task produces 
`numElementsPerProducer + 1` watermarks.
+                       // Besides, watermarks from the same producer sub task 
should keep in order.
+                       for (List<KafkaShuffleWatermark> subTaskWatermarks : 
watermarks.values()) {
+                               int index = 0;
+                               Assert.assertEquals(numElementsPerProducer + 1, 
subTaskWatermarks.size());
+                               for (KafkaShuffleWatermark watermark : 
subTaskWatermarks) {
+                                       if (index == numElementsPerProducer) {
+                                               // the last element is the 
watermark that signifies end-of-event-time
+                                               
Assert.assertEquals(watermark.getWatermark(), 
Watermark.MAX_WATERMARK.getTimestamp());
+                                       } else {
+                                               
Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Schema: (key, timestamp, source instance Id).
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1
+        * To test no data is lost or duplicated end-2-end
+        */
+       private void testKafkaShuffle(
+                       String prefix, int numElementsPerProducer, 
TimeCharacteristic timeCharacteristic) throws Exception {
+               String topic = topic(prefix, timeCharacteristic);
+               final int numberOfPartitions = 1;
+               final int producerParallelism = 1;
+
+               createTestTopic(topic, numberOfPartitions, 1);
+
+               final StreamExecutionEnvironment env = 
createEnvironment(producerParallelism, timeCharacteristic);
+               createKafkaShuffle(
+                       env, topic, numElementsPerProducer, 
producerParallelism, timeCharacteristic, numberOfPartitions)

Review comment:
       chop

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##########
@@ -35,9 +35,9 @@
 
        private final SinkTransformation<T> transformation;
 
-       @SuppressWarnings("unchecked")
        protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> 
operator) {
-               this.transformation = new 
SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, 
inputStream.getExecutionEnvironment().getParallelism());

Review comment:
       Unrelated changes, please revert or pull into a separate hotfix if you 
deem it necessary (I don't). 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism.

Review comment:
       Add description when `producerParallelism` should be != 
`numberOfPartitions` as it's none-trivial to decide. 
   Also is there any support for just going with default degree of parallelism, 
such that it can be changed through configs?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleBase {
+
+       @Rule
+       public final Timeout timeout = Timeout.millis(600000L);
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with the default 
time characteristic: ProcessingTime
+        */
+
+       @Test
+       public void testSimpleProcessingTime() throws Exception {
+               testKafkaShuffle("test_simple", 200000, ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with time 
characteristic: IngestionTime
+        */
+       @Test
+       public void testSimpleIngestionTime() throws Exception {
+               testKafkaShuffle("test_simple", 200000, IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test no data is lost or duplicated end-2-end with time 
characteristic: EventTime
+        */
+       @Test
+       public void testSimpleEventTime() throws Exception {
+               testKafkaShuffle("test_simple", 100000, EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: ProcessingTime
+        */
+       @Test
+       public void testAssignedToPartitionProcessingTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 300000, 
ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: IngestionTime
+        */
+       @Test
+       public void testAssignedToPartitionIngestionTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 300000, 
IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 2; Kafka Partition # = 3; Consumer 
Parallelism = 3.
+        * To test data is partitioned to the right partition with time 
characteristic: EventTime
+        */
+       @Test
+       public void testAssignedToPartitionEventTime() throws Exception {
+               testAssignedToPartition("test_assigned_to_partition", 100000, 
EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value serialization and deserialization with time 
characteristic: ProcessingTime
+        */
+       @Test
+       public void testSerDeProcessingTime() throws Exception {
+               testRecordSerDe(ProcessingTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: IngestionTime
+        */
+       @Test
+       public void testSerDeIngestionTime() throws Exception {
+               testRecordSerDe(IngestionTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: EventTime
+        */
+       @Test
+       public void testSerDeEventTime() throws Exception {
+               testRecordSerDe(EventTime);
+       }
+
+       /**
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1.
+        * To test value and watermark serialization and deserialization with 
time characteristic: EventTime
+        */
+       @Test
+       public void testWatermarkBroadcasting() throws Exception {
+               final int numberOfPartitions = 3;
+               final int producerParallelism = 2;
+               final int numElementsPerProducer = 1000;
+
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> 
results = testKafkaShuffleProducer(
+                       topic("test_watermark_broadcast", EventTime),
+                       env,
+                       numberOfPartitions,
+                       producerParallelism,
+                       numElementsPerProducer,
+                       EventTime);
+               TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer = 
createTypeSerializer(env);
+               KafkaShuffleElementDeserializer<Tuple3<Integer, Long, Integer>> 
deserializer = new KafkaShuffleElementDeserializer<>();
+
+               // Records in a single partition are kept in order
+               for (int p = 0; p < numberOfPartitions; p++) {
+                       Collection<ConsumerRecord<byte[], byte[]>> records = 
results.get(p);
+                       Map<Integer, List<KafkaShuffleWatermark>> watermarks = 
new HashMap<>();
+
+                       for (ConsumerRecord<byte[], byte[]> consumerRecord : 
records) {
+                               Assert.assertNull(consumerRecord.key());
+                               KafkaShuffleElement<Tuple3<Integer, Long, 
Integer>> element =
+                                       
deserializer.deserialize(typeSerializer, consumerRecord);
+                               if (element.isRecord()) {
+                                       KafkaShuffleRecord<Tuple3<Integer, 
Long, Integer>> record = element.asRecord();
+                                       
Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP + 
record.getValue().f0);
+                                       
Assert.assertEquals(record.getTimestamp().longValue(), 
record.getValue().f1.longValue());
+                               } else if (element.isWatermark()) {
+                                       KafkaShuffleWatermark watermark = 
element.asWatermark();
+                                       
watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+                                       
watermarks.get(watermark.getSubtask()).add(watermark);
+                               } else {
+                                       fail("KafkaShuffleElement is either 
record or watermark");
+                               }
+                       }
+
+                       // According to the setting how watermarks are 
generated in this ITTest,
+                       // every producer task emits a watermark corresponding 
to each record + the end-of-event-time watermark.
+                       // Hence each producer sub task generates 
`numElementsPerProducer + 1` watermarks.
+                       // Each producer sub task broadcasts these 
`numElementsPerProducer + 1` watermarks to all partitions.
+                       // Thus in total, each producer sub task emits 
`(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+                       // From the consumer side, each partition receives 
`(numElementsPerProducer + 1) * producerParallelism` watermarks,
+                       // with each producer sub task produces 
`numElementsPerProducer + 1` watermarks.
+                       // Besides, watermarks from the same producer sub task 
should keep in order.
+                       for (List<KafkaShuffleWatermark> subTaskWatermarks : 
watermarks.values()) {
+                               int index = 0;
+                               Assert.assertEquals(numElementsPerProducer + 1, 
subTaskWatermarks.size());
+                               for (KafkaShuffleWatermark watermark : 
subTaskWatermarks) {
+                                       if (index == numElementsPerProducer) {
+                                               // the last element is the 
watermark that signifies end-of-event-time
+                                               
Assert.assertEquals(watermark.getWatermark(), 
Watermark.MAX_WATERMARK.getTimestamp());
+                                       } else {
+                                               
Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Schema: (key, timestamp, source instance Id).
+        * Producer Parallelism = 1; Kafka Partition # = 1; Consumer 
Parallelism = 1
+        * To test no data is lost or duplicated end-2-end
+        */
+       private void testKafkaShuffle(
+                       String prefix, int numElementsPerProducer, 
TimeCharacteristic timeCharacteristic) throws Exception {

Review comment:
       chop

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static 
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition> 
{
+       private static final Logger LOG = 
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+       private final WatermarkHandler watermarkHandler;
+       // 
------------------------------------------------------------------------
+
+       /** The schema to convert between Kafka's byte messages, and Flink's 
objects. */
+       private final KafkaShuffleElementDeserializer<T> deserializer;
+
+       /** Serializer to serialize record. */
+       private final TypeSerializer<T> serializer;
+
+       /** The handover of data and exceptions between the consumer thread and 
the task thread. */
+       private final Handover handover;
+
+       /** The thread that runs the actual KafkaConsumer and hand the record 
batches to this fetcher. */
+       private final KafkaConsumerThread consumerThread;
+
+       /** Flag to mark the main work loop as alive. */
+       private volatile boolean running = true;
+
+       public KafkaShuffleFetcher(
+                       SourceFunction.SourceContext<T> sourceContext,
+                       Map<KafkaTopicPartition, Long> 
assignedPartitionsWithInitialOffsets,
+                       SerializedValue<AssignerWithPeriodicWatermarks<T>> 
watermarksPeriodic,
+                       SerializedValue<AssignerWithPunctuatedWatermarks<T>> 
watermarksPunctuated,
+                       ProcessingTimeService processingTimeProvider,
+                       long autoWatermarkInterval,
+                       ClassLoader userCodeClassLoader,
+                       String taskNameWithSubtasks,
+                       TypeSerializer<T> serializer,
+                       Properties kafkaProperties,
+                       long pollTimeout,
+                       MetricGroup subtaskMetricGroup,
+                       MetricGroup consumerMetricGroup,
+                       boolean useMetrics,
+                       int producerParallelism) throws Exception {
+               super(
+                       sourceContext,
+                       assignedPartitionsWithInitialOffsets,
+                       watermarksPeriodic,
+                       watermarksPunctuated,
+                       processingTimeProvider,
+                       autoWatermarkInterval,
+                       userCodeClassLoader,
+                       consumerMetricGroup,
+                       useMetrics);
+               this.deserializer = new KafkaShuffleElementDeserializer<>();
+               this.serializer = serializer;
+               this.handover = new Handover();
+               this.consumerThread = new KafkaConsumerThread(
+                       LOG,
+                       handover,
+                       kafkaProperties,
+                       unassignedPartitionsQueue,
+                       getFetcherName() + " for " + taskNameWithSubtasks,
+                       pollTimeout,
+                       useMetrics,
+                       consumerMetricGroup,
+                       subtaskMetricGroup);
+               this.watermarkHandler = new 
WatermarkHandler(producerParallelism);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Fetcher work methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void runFetchLoop() throws Exception {
+               try {
+                       final Handover handover = this.handover;
+
+                       // kick off the actual Kafka consumer
+                       consumerThread.start();
+
+                       while (running) {
+                               // this blocks until we get the next records
+                               // it automatically re-throws exceptions 
encountered in the consumer thread
+                               final ConsumerRecords<byte[], byte[]> records = 
handover.pollNext();
+
+                               // get the records for each topic partition
+                               for (KafkaTopicPartitionState<TopicPartition> 
partition : subscribedPartitionStates()) {
+                                       List<ConsumerRecord<byte[], byte[]>> 
partitionRecords =
+                                               
records.records(partition.getKafkaPartitionHandle());
+
+                                       for (ConsumerRecord<byte[], byte[]> 
record : partitionRecords) {
+                                               final KafkaShuffleElement<T> 
element = deserializer.deserialize(serializer, record);
+
+                                               // TODO: do we need to check 
the end of stream if reaching the end watermark?
+
+                                               if (element.isRecord()) {
+                                                       // timestamp is 
inherent from upstream
+                                                       // If using 
ProcessTime, timestamp is going to be ignored (upstream does not include 
timestamp as well)
+                                                       // If using 
IngestionTime, timestamp is going to be overwritten
+                                                       // If using EventTime, 
timestamp is going to be used
+                                                       synchronized 
(checkpointLock) {
+                                                               
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+                                                               
sourceContext.collectWithTimestamp(
+                                                                       
elementAsRecord.value,
+                                                                       
elementAsRecord.timestamp == null ? record.timestamp() : 
elementAsRecord.timestamp);
+                                                               
partition.setOffset(record.offset());
+                                                       }
+                                               } else if 
(element.isWatermark()) {
+                                                       final 
KafkaShuffleWatermark watermark = element.asWatermark();
+                                                       Optional<Watermark> 
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+                                                       
newWatermark.ifPresent(sourceContext::emitWatermark);
+                                               }
+                                       }
+                               }
+                       }
+               }
+               finally {
+                       // this signals the consumer thread that no more work 
is to be done
+                       consumerThread.shutdown();
+               }
+
+               // on a clean exit, wait for the runner thread
+               try {
+                       consumerThread.join();
+               }
+               catch (InterruptedException e) {
+                       // may be the result of a wake-up interruption after an 
exception.
+                       // we ignore this here and only restore the 
interruption state
+                       Thread.currentThread().interrupt();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               // flag the main thread to exit. A thread interrupt will come 
anyways.
+               running = false;
+               handover.close();
+               consumerThread.shutdown();
+       }
+
+       @Override
+       protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition 
partition) {
+               return new TopicPartition(partition.getTopic(), 
partition.getPartition());
+       }
+
+       @Override
+       protected void doCommitInternalOffsetsToKafka(
+                       Map<KafkaTopicPartition, Long> offsets,
+                       @Nonnull KafkaCommitCallback commitCallback) throws 
Exception {
+               @SuppressWarnings("unchecked")
+               List<KafkaTopicPartitionState<TopicPartition>> partitions = 
subscribedPartitionStates();
+
+               Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new 
HashMap<>(partitions.size());
+
+               for (KafkaTopicPartitionState<TopicPartition> partition : 
partitions) {
+                       Long lastProcessedOffset = 
offsets.get(partition.getKafkaTopicPartition());
+                       if (lastProcessedOffset != null) {
+                               checkState(lastProcessedOffset >= 0, "Illegal 
offset value to commit");
+
+                               // committed offsets through the KafkaConsumer 
need to be 1 more than the last processed offset.
+                               // This does not affect Flink's 
checkpoints/saved state.
+                               long offsetToCommit = lastProcessedOffset + 1;
+
+                               
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new 
OffsetAndMetadata(offsetToCommit));
+                               partition.setCommittedOffset(offsetToCommit);
+                       }
+               }
+
+               // record the work to be committed by the main consumer thread 
and make sure the consumer notices that
+               consumerThread.setOffsetsToCommit(offsetsToCommit, 
commitCallback);
+       }
+
+       private String getFetcherName() {
+               return "Kafka Shuffle Fetcher";
+       }
+
+       /**
+        * An element in a KafkaShuffle. Can be a record or a Watermark.
+        */
+       @VisibleForTesting
+       public abstract static class KafkaShuffleElement<T> {
+
+               public boolean isRecord() {
+                       return getClass() == KafkaShuffleRecord.class;
+               }
+
+               public boolean isWatermark() {
+                       return getClass() == KafkaShuffleWatermark.class;
+               }
+
+               public KafkaShuffleRecord<T> asRecord() {
+                       return (KafkaShuffleRecord<T>) this;
+               }
+
+               public KafkaShuffleWatermark asWatermark() {
+                       return (KafkaShuffleWatermark) this;
+               }
+       }
+
+       /**
+        * A watermark element in a KafkaShuffle. It includes
+        * - subtask index where the watermark is coming from
+        * - watermark timestamp
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleWatermark<T> extends 
KafkaShuffleElement<T> {
+               final int subtask;
+               final long watermark;
+
+               KafkaShuffleWatermark(int subtask, long watermark) {
+                       this.subtask = subtask;
+                       this.watermark = watermark;
+               }
+
+               public int getSubtask() {
+                       return subtask;
+               }
+
+               public long getWatermark() {
+                       return watermark;
+               }
+       }
+
+       /**
+        * One value with Type T in a KafkaShuffle. This stores the value and 
an optional associated timestamp.
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleRecord<T> extends 
KafkaShuffleElement<T> {
+               final T value;
+               final Long timestamp;
+
+               KafkaShuffleRecord(T value) {
+                       this.value = value;
+                       this.timestamp = null;
+               }
+
+               KafkaShuffleRecord(long timestamp, T value) {
+                       this.value = value;
+                       this.timestamp = timestamp;
+               }
+
+               public T getValue() {
+                       return value;
+               }
+
+               public Long getTimestamp() {
+                       return timestamp;
+               }
+       }
+
+       /**
+        * Deserializer for KafkaShuffleElement.
+        * @param <T>
+        */
+       @VisibleForTesting
+       public static class KafkaShuffleElementDeserializer<T> implements 
Serializable {
+               private transient DataInputDeserializer dis;
+
+               @VisibleForTesting
+               public KafkaShuffleElementDeserializer() {
+                       this.dis = new DataInputDeserializer();
+               }
+
+               @VisibleForTesting
+               public KafkaShuffleElement<T> deserialize(TypeSerializer<T> 
serializer, ConsumerRecord<byte[], byte[]> record)
+                       throws Exception {
+                       byte[] value = record.value();
+                       dis.setBuffer(value);
+                       int tag = IntSerializer.INSTANCE.deserialize(dis);
+
+                       if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+                               return new 
KafkaShuffleRecord<>(serializer.deserialize(dis));
+                       } else if (tag == TAG_REC_WITH_TIMESTAMP) {
+                               return new 
KafkaShuffleRecord<>(LongSerializer.INSTANCE.deserialize(dis), 
serializer.deserialize(dis));
+                       } else if (tag == TAG_WATERMARK) {
+                               return new KafkaShuffleWatermark<>(
+                                       
IntSerializer.INSTANCE.deserialize(dis), 
LongSerializer.INSTANCE.deserialize(dis));
+                       }
+
+                       throw new UnsupportedOperationException("Unsupported 
tag format");
+               }
+       }
+
+       /**
+        * WatermarkHandler to generate watermarks.
+        */
+       private static class WatermarkHandler {
+               private final int producerParallelism;
+               private final Map<Integer, Long> subtaskWatermark;
+
+               private long currentMinWatermark = Long.MIN_VALUE;
+
+               WatermarkHandler(int numberOfSubtask) {

Review comment:
       param name is very confusing. Should also be `producerParallelism`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import 
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair 
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+       static final String PRODUCER_PARALLELISM = "producer parallelism";
+       static final String PARTITION_NUMBER = "partition number";
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism.
+        *
+        * @param inputStream input stream to the kafka
+        * @param topic kafka topic
+        * @param producerParallelism parallelism of producer
+        * @param numberOfPartitions number of partitions
+        * @param properties Kafka properties
+        * @param fields key positions from inputStream
+        * @param <T> input type
+        */
+       public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+                       DataStream<T> inputStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       int... fields) {
+               return persistentKeyBy(
+                       inputStream, topic, producerParallelism, 
numberOfPartitions, properties, keySelector(inputStream, fields));
+       }
+
+       /**
+        * Write to and read from a kafka shuffle with the partition decided by 
keys.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism.
+        *
+        * @param inputStream input stream to the kafka
+        * @param topic kafka topic
+        * @param producerParallelism parallelism of producer
+        * @param numberOfPartitions number of partitions
+        * @param properties Kafka properties
+        * @param keySelector key(K) based on inputStream(T)
+        * @param <T> input type
+        * @param <K> key type
+        */
+       public static <T, K> KeyedStream<T, K> persistentKeyBy(
+                       DataStream<T> inputStream,
+                       String topic,
+                       int producerParallelism,
+                       int numberOfPartitions,
+                       Properties properties,
+                       KeySelector<T, K> keySelector) {
+               // KafkaProducer#propsToMap uses Properties purely as a HashMap 
without considering the default properties
+               // So we have to flatten the default property to first level 
elements.
+               Properties kafkaProperties = PropertiesUtil.flatten(properties);
+               kafkaProperties.setProperty(PRODUCER_PARALLELISM, 
String.valueOf(producerParallelism));
+               kafkaProperties.setProperty(PARTITION_NUMBER, 
String.valueOf(numberOfPartitions));
+
+               StreamExecutionEnvironment env = 
inputStream.getExecutionEnvironment();
+               TypeInformationSerializationSchema<T> schema =
+                       new 
TypeInformationSerializationSchema<>(inputStream.getType(), env.getConfig());
+
+               writeKeyBy(inputStream, topic, kafkaProperties, keySelector);
+               return readKeyBy(topic, env, schema, kafkaProperties, 
keySelector);
+       }
+
+       /**
+        * Write to a kafka shuffle with the partition decided by keys.
+        * @param inputStream input stream to the kafka
+        * @param topic kafka topic
+        * @param kafkaProperties kafka properties
+        * @param fields key positions from inputStream
+        * @param <T> input type
+        */
+       public static <T> void writeKeyBy(
+                       DataStream<T> inputStream,
+                       String topic,
+                       Properties kafkaProperties,
+                       int... fields) {
+               writeKeyBy(inputStream, topic, kafkaProperties, 
keySelector(inputStream, fields));
+       }
+
+       /**
+        * Write to a kafka shuffle with the partition decided by keys.
+        * @param inputStream input stream to the kafka
+        * @param topic kafka topic
+        * @param kafkaProperties kafka properties
+        * @param keySelector key(K) based on input(T)
+        * @param <T> input type
+        * @param <K> key type
+        */
+       public static <T, K> void writeKeyBy(
+                       DataStream<T> inputStream,
+                       String topic,
+                       Properties kafkaProperties,
+                       KeySelector<T, K> keySelector) {
+               StreamExecutionEnvironment env = 
inputStream.getExecutionEnvironment();
+               TypeInformationSerializationSchema<T> schema =
+                       new 
TypeInformationSerializationSchema<>(inputStream.getType(), env.getConfig());
+
+               // write data to Kafka
+               FlinkKafkaShuffleProducer<T, K> kafkaProducer = new 
FlinkKafkaShuffleProducer<>(
+                       topic,
+                       schema,
+                       kafkaProperties,
+                       env.clean(keySelector),
+                       FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
+                       FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+               // make sure the sink parallelism is set to producerParallelism
+               Preconditions.checkArgument(
+                       kafkaProperties.getProperty(PRODUCER_PARALLELISM) != 
null,
+                       "Missing producer parallelism for Kafka Shuffle");
+               int producerParallelism = 
PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE);
+
+               addKafkaShuffle(inputStream, kafkaProducer, 
producerParallelism);
+       }
+
+       /**
+        * Read data from a Kafka Shuffle.
+        * Consumers should read partitions equal to the key group indices they 
are assigned.
+        * The number of partitions is the maximum parallelism of the receiving 
operator.
+        * This version only supports numberOfPartitions = consumerParallelism
+        *
+        * @param topic kafka topic
+        * @param env streaming execution environment. readKeyBy environment 
can be different from writeKeyBy
+        * @param schema the record schema to read
+        * @param kafkaProperties kafka properties
+        * @param keySelector key(K) based on schema(T)
+        * @param <T> schema type
+        * @param <K> key type
+        * @return keyed data stream
+        */
+       public static <T, K> KeyedStream<T, K> readKeyBy(
+                       String topic,
+                       StreamExecutionEnvironment env,
+                       TypeInformationSerializationSchema<T> schema,
+                       Properties kafkaProperties,
+                       KeySelector<T, K> keySelector) {
+               SourceFunction<T> kafkaConsumer = new 
FlinkKafkaShuffleConsumer<>(topic, schema, kafkaProperties);
+
+               // TODO: consider situations where numberOfPartitions != 
consumerParallelism
+               Preconditions.checkArgument(
+                       kafkaProperties.getProperty(PARTITION_NUMBER) != null,
+                       "Missing partition number for Kafka Shuffle");
+               int numberOfPartitions = PropertiesUtil.getInt(kafkaProperties, 
PARTITION_NUMBER, Integer.MIN_VALUE);
+               DataStream<T> outputDataStream = 
env.addSource(kafkaConsumer).setParallelism(numberOfPartitions);
+
+               return 
DataStreamUtils.reinterpretAsKeyedStream(outputDataStream, keySelector);
+       }
+
+       /**
+        * Add a {@link StreamKafkaShuffleSink} to {@link DataStream}.
+        * {@link StreamKafkaShuffleSink} is associated a {@link 
FlinkKafkaShuffleProducer}.
+        *
+        * @param inputStream the input data stream connected to the shuffle
+        * @param kafkaShuffleProducer kafka shuffle sink function that can 
handle both records and watermark
+        * @param producerParallelism the number of tasks writing to the kafka 
shuffle
+        */
+       private static <T, K> void addKafkaShuffle(
+                       DataStream<T> inputStream, FlinkKafkaShuffleProducer<T, 
K> kafkaShuffleProducer, int producerParallelism) {
+
+               // read the output type of the input Transform to coax out 
errors about MissingTypeInfo
+               inputStream.getTransformation().getOutputType();
+
+               StreamKafkaShuffleSink<T> shuffleSinkOperator = new 
StreamKafkaShuffleSink<>(kafkaShuffleProducer);
+               SinkTransformation<T> transformation = new SinkTransformation<>(
+                       inputStream.getTransformation(),
+                       "kafka_shuffle",
+                       shuffleSinkOperator,
+                       inputStream.getExecutionEnvironment().getParallelism());
+               
inputStream.getExecutionEnvironment().addOperator(transformation);
+               transformation.setParallelism(producerParallelism);
+       }
+
+       // A better place to put this function is DataStream; but put it here 
for now to avoid changing DataStream

Review comment:
       I wouldn't mind as this only changes internals.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to