AHeise commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r426275573
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
##########
@@ -264,7 +264,7 @@ public FlinkKafkaConsumerBase(
* @param properties - Kafka configuration properties to be adjusted
* @param offsetCommitMode offset commit mode
Review comment:
I was referring to "Kafka Shuffle Consumer Part " in commit message.
Could by symmetric to producer part.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
##########
@@ -192,6 +176,29 @@ protected String getFetcherName() {
return "Kafka Fetcher";
}
+ protected void partitionConsumerRecordsHandler(
+ List<ConsumerRecord<byte[], byte[]>> partitionRecords,
+ KafkaTopicPartitionState<TopicPartition> partition)
throws Exception {
+
+ for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
+ deserializer.deserialize(record, kafkaCollector);
+
+ // emit the actual records. this also updates offset
state atomically and emits
+ // watermarks
+ emitRecordsWithTimestamps(
+ kafkaCollector.getRecords(),
+ partition,
+ record.offset(),
+ record.timestamp());
+
+ if (kafkaCollector.isEndOfStreamSignalled()) {
+ // end of stream signaled
+ running = false;
+ break;
+ }
Review comment:
I don't see the issue, automatic intellij extraction already helps.
```
while (running) {
...
for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
if (handleRecord(partition,
record)) {
running = false;
break;
}
}
}
```
and
```
protected boolean handleRecord(KafkaTopicPartitionState<TopicPartition>
partition, ConsumerRecord<byte[], byte[]> record) throws Exception {
deserializer.deserialize(record, kafkaCollector);
// emit the actual records. this also updates offset state
atomically and emits
// watermarks
emitRecordsWithTimestamps(
kafkaCollector.getRecords(),
partition,
record.offset(),
record.timestamp());
return kafkaCollector.isEndOfStreamSignalled();
}
```
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In
the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer',
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA ->
KafkaShuffleConsumer -> ...
+ * |
+ * | ---------->
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the
same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is
propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions
equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of
the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a
consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka
partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each
producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param keySelector Key selector to retrieve key
from `dataStream'
+ * @param <T> Type of the input data
stream
+ * @param <K> Type of key
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap
without considering the default properties
+ // So we have to flatten the default property to first level
elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM,
String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER,
String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env =
dataStream.getExecutionEnvironment();
+
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, dataStream.getType(),
kafkaProperties, keySelector);
+ }
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param fields Key positions from the
input data stream
+ * @param <T> Type of the input data
stream
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ dataStream,
+ topic,
+ producerParallelism,
+ numberOfPartitions,
+ properties,
+ keySelector(dataStream, fields));
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>This function contains a {@link FlinkKafkaShuffleProducer} to
shuffle and persist data in Kafka.
+ * {@link FlinkKafkaShuffleProducer} uses the same key group assignment
function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on the key.
+ * Here, the number of partitions equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts each watermark
+ * to all of the Kafka partitions to make sure watermark information is
propagated properly.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of
the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of
partitions.
+ * They are not necessarily the same and allowed to be set
independently.
+ *
+ * @see FlinkKafkaShuffle#persistentKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
Review comment:
usually added at the very top of a javadoc
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In
the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer',
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA ->
KafkaShuffleConsumer -> ...
+ * |
+ * | ---------->
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the
same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is
propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions
equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of
the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a
consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka
partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each
producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param keySelector Key selector to retrieve key
from `dataStream'
+ * @param <T> Type of the input data
stream
+ * @param <K> Type of key
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap
without considering the default properties
+ // So we have to flatten the default property to first level
elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM,
String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER,
String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env =
dataStream.getExecutionEnvironment();
+
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, dataStream.getType(),
kafkaProperties, keySelector);
+ }
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
Review comment:
copy long explanation from above.
##########
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##########
@@ -19,6 +19,7 @@
import org.slf4j.Logger;
Review comment:
I don't think adding javadoc style formatting to commit message makes
much sense. Plerase remove. Sorry for seeing it so late.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In
the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer',
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA ->
KafkaShuffleConsumer -> ...
+ * |
+ * | ---------->
KafkaShuffleConsumerReuse -> ...
Review comment:
Really good.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
Review comment:
Really good to have an example, but i'd replace the type tokens by some
real types.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
+ //
------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's
objects. */
+ private final KafkaShuffleElementDeserializer<T> deserializer;
+
+ /** Serializer to serialize record. */
+ private final TypeSerializer<T> serializer;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread. */
+ private final Handover handover;
+
+ /** The thread that runs the actual KafkaConsumer and hand the record
batches to this fetcher. */
+ private final KafkaConsumerThread consumerThread;
+
+ /** Flag to mark the main work loop as alive. */
+ private volatile boolean running = true;
+
+ public KafkaShuffleFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long>
assignedPartitionsWithInitialOffsets,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>>
watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>>
watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ TypeSerializer<T> serializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics,
+ int producerParallelism) throws Exception {
+ super(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ consumerMetricGroup,
+ useMetrics);
+
+ this.deserializer = new KafkaShuffleElementDeserializer<>();
+ this.serializer = serializer;
+ this.handover = new Handover();
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ unassignedPartitionsQueue,
+ getFetcherName() + " for " + taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics,
+ consumerMetricGroup,
+ subtaskMetricGroup);
+ this.watermarkHandler = new
WatermarkHandler(producerParallelism);
+ }
+
+ //
------------------------------------------------------------------------
+ // Fetcher work methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ try {
+ final Handover handover = this.handover;
+
+ // kick off the actual Kafka consumer
+ consumerThread.start();
+
+ while (running) {
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions
encountered in the consumer thread
+ final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitionStates()) {
+ List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
+
records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
+ final KafkaShuffleElement<T>
element = deserializer.deserialize(serializer, record);
+
+ // TODO: do we need to check
the end of stream if reaching the end watermark?
+
+ if (element.isRecord()) {
+ // timestamp is
inherent from upstream
+ // If using
ProcessTime, timestamp is going to be ignored (upstream does not include
timestamp as well)
+ // If using
IngestionTime, timestamp is going to be overwritten
+ // If using EventTime,
timestamp is going to be used
+ synchronized
(checkpointLock) {
+
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+
sourceContext.collectWithTimestamp(
+
elementAsRecord.value,
+
elementAsRecord.timestamp == null ? record.timestamp() :
elementAsRecord.timestamp);
+
partition.setOffset(record.offset());
+ }
+ } else if
(element.isWatermark()) {
+ final
KafkaShuffleWatermark watermark = element.asWatermark();
+ Optional<Watermark>
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+
newWatermark.ifPresent(sourceContext::emitWatermark);
+ }
+ }
+ }
+ }
+ }
+ finally {
+ // this signals the consumer thread that no more work
is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an
exception.
+ // we ignore this here and only restore the
interruption state
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come
anyways.
+ running = false;
+ handover.close();
+ consumerThread.shutdown();
+ }
+
+ @Override
+ protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition
partition) {
+ return new TopicPartition(partition.getTopic(),
partition.getPartition());
+ }
+
+ @Override
+ protected void doCommitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ List<KafkaTopicPartitionState<TopicPartition>> partitions =
subscribedPartitionStates();
+
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new
HashMap<>(partitions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> partition :
partitions) {
+ Long lastProcessedOffset =
offsets.get(partition.getKafkaTopicPartition());
+ if (lastProcessedOffset != null) {
+ checkState(lastProcessedOffset >= 0, "Illegal
offset value to commit");
+
+ // committed offsets through the KafkaConsumer
need to be 1 more than the last processed offset.
+ // This does not affect Flink's
checkpoints/saved state.
+ long offsetToCommit = lastProcessedOffset + 1;
+
+
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new
OffsetAndMetadata(offsetToCommit));
+ partition.setCommittedOffset(offsetToCommit);
+ }
+ }
+
+ // record the work to be committed by the main consumer thread
and make sure the consumer notices that
+ consumerThread.setOffsetsToCommit(offsetsToCommit,
commitCallback);
+ }
+
+ private String getFetcherName() {
+ return "Kafka Shuffle Fetcher";
+ }
+
+ private abstract static class KafkaShuffleElement<T> {
+
+ public boolean isRecord() {
+ return getClass() == KafkaShuffleRecord.class;
+ }
+
+ boolean isWatermark() {
+ return getClass() == KafkaShuffleWatermark.class;
+ }
+
+ KafkaShuffleRecord<T> asRecord() {
+ return (KafkaShuffleRecord<T>) this;
+ }
+
+ KafkaShuffleWatermark asWatermark() {
+ return (KafkaShuffleWatermark) this;
+ }
+ }
+
+ private static class KafkaShuffleWatermark<T> extends
KafkaShuffleElement<T> {
+ final int subtask;
+ final long watermark;
+
+ KafkaShuffleWatermark(int subtask, long watermark) {
+ this.subtask = subtask;
+ this.watermark = watermark;
+ }
+ }
+
+ private static class KafkaShuffleRecord<T> extends
KafkaShuffleElement<T> {
+ final T value;
+ final Long timestamp;
+
+ KafkaShuffleRecord(T value) {
+ this.value = value;
+ this.timestamp = null;
+ }
+
+ KafkaShuffleRecord(long timestamp, T value) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+ }
+
+ private static class KafkaShuffleElementDeserializer<T> implements
Serializable {
+ private transient DataInputDeserializer dis;
+
+ KafkaShuffleElementDeserializer() {
+ this.dis = new DataInputDeserializer();
+ }
+
+ KafkaShuffleElement<T> deserialize(TypeSerializer<T>
serializer, ConsumerRecord<byte[], byte[]> record)
+ throws Exception {
+ byte[] value = record.value();
+ dis.setBuffer(value);
+ int tag = IntSerializer.INSTANCE.deserialize(dis);
+
+ if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ return new
KafkaShuffleRecord<>(serializer.deserialize(dis));
+ } else if (tag == TAG_REC_WITH_TIMESTAMP) {
+ return new
KafkaShuffleRecord<>(LongSerializer.INSTANCE.deserialize(dis),
serializer.deserialize(dis));
+ } else if (tag == TAG_WATERMARK) {
+ return new KafkaShuffleWatermark<>(
+
IntSerializer.INSTANCE.deserialize(dis),
LongSerializer.INSTANCE.deserialize(dis));
+ }
+
+ throw new UnsupportedOperationException("Unsupported
tag format");
+ }
+ }
+
+ /**
+ * WatermarkHandler to generate watermarks.
+ */
+ private static class WatermarkHandler {
+ private final int producerParallelism;
+ private final Map<Integer, Long> subtaskWatermark;
+
+ private long currentMinWatermark = Long.MIN_VALUE;
+
+ WatermarkHandler(int numberOfSubtask) {
+ this.producerParallelism = numberOfSubtask;
+ this.subtaskWatermark = new HashMap<>(numberOfSubtask);
+ }
+
+ public Optional<Watermark>
checkAndGetNewWatermark(KafkaShuffleWatermark newWatermark) {
+ // watermarks is incremental for the same partition and
PRODUCER subtask
+ Long currentSubTaskWatermark =
subtaskWatermark.get(newWatermark.subtask);
+
+ Preconditions.checkState(
+ (currentSubTaskWatermark == null) ||
(currentSubTaskWatermark <= newWatermark.watermark),
+ "Watermark should always increase");
+
+ subtaskWatermark.put(newWatermark.subtask,
newWatermark.watermark);
+
+ if (subtaskWatermark.values().size() <
producerParallelism) {
+ return Optional.empty();
+ }
Review comment:
Clarified offline, not needed as MAX watermark is propagated. If we want
to support bounded sources all the way, we'd need to handle
`EndOfPartitionEvents` but we can do that in a later version. `KafkaShuffle`
will probably almost never be used with bounded sources.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
Review comment:
can be reused **in an independent job**
(Flink does support streaming the datastream into multiple downstream
operators already)
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In
the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer',
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA ->
KafkaShuffleConsumer -> ...
+ * |
+ * | ---------->
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the
same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is
propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions
equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of
the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a
consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka
partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each
producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param keySelector Key selector to retrieve key
from `dataStream'
+ * @param <T> Type of the input data
stream
+ * @param <K> Type of key
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap
without considering the default properties
+ // So we have to flatten the default property to first level
elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM,
String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER,
String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env =
dataStream.getExecutionEnvironment();
+
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, dataStream.getType(),
kafkaProperties, keySelector);
+ }
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param fields Key positions from the
input data stream
+ * @param <T> Type of the input data
stream
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ dataStream,
+ topic,
+ producerParallelism,
+ numberOfPartitions,
+ properties,
+ keySelector(dataStream, fields));
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>This function contains a {@link FlinkKafkaShuffleProducer} to
shuffle and persist data in Kafka.
+ * {@link FlinkKafkaShuffleProducer} uses the same key group assignment
function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on the key.
+ * Here, the number of partitions equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts each watermark
+ * to all of the Kafka partitions to make sure watermark information is
propagated properly.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of
the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of
partitions.
+ * They are not necessarily the same and allowed to be set
independently.
Review comment:
very good
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleTestBase.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+
+import org.junit.BeforeClass;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+
+/**
+ * Base Test Class for KafkaShuffle.
+ */
+public class KafkaShuffleTestBase extends KafkaConsumerTestBase {
+ static final long INIT_TIMESTAMP = System.currentTimeMillis();
Review comment:
As I have said, if I want to compare two shuffle topics created by two
commits to track down some bug, the changed timestamp will actually be a big
PITA.
Tests don't need to be realistic in the sense of data, but should be
realistic in terms of workload (or queries).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * {@link FlinkKafkaShuffle} uses Kafka as a message bus to shuffle and
persist data at the same time.
+ *
+ * <p>Persisting shuffle data is useful when
+ * - you would like to reuse the shuffle data and/or,
+ * - you would like to avoid a full restart of a pipeline during failure
recovery
+ *
+ * <p>Persisting shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * a {@link FlinkKafkaShuffleConsumer} together into a {@link
FlinkKafkaShuffle}.
+ * Here is an example how to use a {@link FlinkKafkaShuffle}.
+ *
+ * <p><pre>{@code
+ * StreamExecutionEnvironment env = ...
// create execution environment
+ * DataStream<X> source = env.addSource(...)
// add data stream source
+ * DataStream<Y> dataStream = ...
// some transformation(s) based on source
+ *
+ * KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
+ * .persistentKeyBy(
// keyBy shuffle through kafka
+ * dataStream,
// data stream to be shuffled
+ * topic,
// Kafka topic written to
+ * producerParallelism,
// the number of tasks of a Kafka Producer
+ * numberOfPartitions,
// the number of partitions of the Kafka topic written
to
+ * kafkaProperties,
// kafka properties for Kafka Producer and Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key from `dataStream'
+ *
+ * keyedStream.transform...
// some other transformation(s)
+ *
+ * KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
+ * .readKeyBy(
// Read the Kafka shuffle data again for other
usages
+ * topic,
// the topic of Kafka where data is persisted
+ * env,
// execution environment, and it can be a new
environment
+ * typeInformation<Y>,
// type information of the data persisted in Kafka
+ * kafkaProperties,
// kafka properties for Kafka Consumer
+ * keySelector<Y, KEY>);
// key selector to retrieve key
+ *
+ * keyedStreamReuse.transform...
// some other transformation(s)
+ * }</pre>
+ *
+ * <p>Usage of {@link FlinkKafkaShuffle#persistentKeyBy} is similar to {@link
DataStream#keyBy(KeySelector)}.
+ * The differences are:
+ *
+ * <p>1). Partitioning is done through {@link FlinkKafkaShuffleProducer}.
{@link FlinkKafkaShuffleProducer} decides
+ * which partition a key goes when writing to Kafka
+ *
+ * <p>2). Shuffle data can be reused through {@link
FlinkKafkaShuffle#readKeyBy}, as shown in the example above.
+ *
+ * <p>3). Job execution is decoupled by the persistent Kafka message bus. In
the example, the job execution graph is
+ * decoupled to three regions: `KafkaShuffleProducer',
`KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse'
+ * through `PERSISTENT DATA` as shown below. If any region
fails the execution, the other two keep progressing.
+ *
+ * <p><pre>
+ * source -> ... KafkaShuffleProducer -> PERSISTENT DATA ->
KafkaShuffleConsumer -> ...
+ * |
+ * | ---------->
KafkaShuffleConsumerReuse -> ...
+ * </pre>
+ */
+@Experimental
+public class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * <p>Persisting keyBy shuffle is achieved by wrapping a {@link
FlinkKafkaShuffleProducer} and
+ * {@link FlinkKafkaShuffleConsumer} together.
+ *
+ * <p>On the producer side, {@link FlinkKafkaShuffleProducer}
+ * is similar to {@link DataStream#keyBy(KeySelector)}. They use the
same key group assignment function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on where the key goes.
+ * Here, `numberOfPartitions` equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts its watermark
+ * to ALL of the Kafka partitions to make sure watermark information is
propagated correctly.
+ *
+ * <p>On the consumer side, each consumer task should read partitions
equal to the key group indices
+ * it is assigned. `numberOfPartitions` is the maximum parallelism of
the consumer. This version only
+ * supports numberOfPartitions = consumerParallelism.
+ * In the case of using {@link TimeCharacteristic#EventTime}, a
consumer task is responsible to emit
+ * watermarks. Watermarks are read from the corresponding Kafka
partitions. Notice that a consumer task only starts
+ * to emit a watermark after reading at least one watermark from each
producer task to make sure watermarks
+ * are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
+ *
+ * @see FlinkKafkaShuffle#writeKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param keySelector Key selector to retrieve key
from `dataStream'
+ * @param <T> Type of the input data
stream
+ * @param <K> Type of key
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap
without considering the default properties
+ // So we have to flatten the default property to first level
elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM,
String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER,
String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env =
dataStream.getExecutionEnvironment();
+
+ writeKeyBy(dataStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, dataStream.getType(),
kafkaProperties, keySelector);
+ }
+
+ /**
+ * Uses Kafka as a message bus to persist keyBy shuffle.
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param producerParallelism Parallelism of producer
+ * @param numberOfPartitions Number of partitions
+ * @param properties Kafka properties
+ * @param fields Key positions from the
input data stream
+ * @param <T> Type of the input data
stream
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ dataStream,
+ topic,
+ producerParallelism,
+ numberOfPartitions,
+ properties,
+ keySelector(dataStream, fields));
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
+ *
+ * <p>This function contains a {@link FlinkKafkaShuffleProducer} to
shuffle and persist data in Kafka.
+ * {@link FlinkKafkaShuffleProducer} uses the same key group assignment
function
+ * {@link KeyGroupRangeAssignment#assignKeyToParallelOperator} to
decide which partition a key goes.
+ * Hence, each producer task can potentially write to each Kafka
partition based on the key.
+ * Here, the number of partitions equals to the key group size.
+ * In the case of using {@link TimeCharacteristic#EventTime}, each
producer task broadcasts each watermark
+ * to all of the Kafka partitions to make sure watermark information is
propagated properly.
+ *
+ * <p>Attention: make sure kafkaProperties include
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} and {@link
FlinkKafkaShuffle#PARTITION_NUMBER} explicitly.
+ * {@link FlinkKafkaShuffle#PRODUCER_PARALLELISM} is the parallelism of
the producer.
+ * {@link FlinkKafkaShuffle#PARTITION_NUMBER} is the number of
partitions.
+ * They are not necessarily the same and allowed to be set
independently.
+ *
+ * @see FlinkKafkaShuffle#persistentKeyBy
+ * @see FlinkKafkaShuffle#readKeyBy
+ *
+ * @param dataStream Data stream to be shuffled
+ * @param topic Kafka topic written to
+ * @param kafkaProperties Kafka properties for Kafka
Producer
+ * @param keySelector Key selector to retrieve key
from `dataStream'
+ * @param <T> Type of the input data
stream
+ * @param <K> Type of key
+ */
+ public static <T, K> void writeKeyBy(
+ DataStream<T> dataStream,
+ String topic,
+ Properties kafkaProperties,
+ KeySelector<T, K> keySelector) {
+
+ StreamExecutionEnvironment env =
dataStream.getExecutionEnvironment();
+ TypeSerializer<T> typeSerializer =
dataStream.getType().createSerializer(env.getConfig());
+
+ // write data to Kafka
+ FlinkKafkaShuffleProducer<T, K> kafkaProducer = new
FlinkKafkaShuffleProducer<>(
+ topic,
+ typeSerializer,
+ kafkaProperties,
+ env.clean(keySelector),
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+ // make sure the sink parallelism is set to producerParallelism
+ Preconditions.checkArgument(
+ kafkaProperties.getProperty(PRODUCER_PARALLELISM) !=
null,
+ "Missing producer parallelism for Kafka Shuffle");
+ int producerParallelism =
PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE);
+
+ addKafkaShuffle(dataStream, kafkaProducer, producerParallelism);
+ }
+
+ /**
+ * The write side of {@link FlinkKafkaShuffle#persistentKeyBy}.
Review comment:
copy javadoc
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]