AHeise commented on a change in pull request #11725:
URL: https://github.com/apache/flink/pull/11725#discussion_r425298217
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -210,7 +210,7 @@
/**
* The name of the default topic this producer is writing data to.
*/
- private final String defaultTopicId;
+ protected final String defaultTopicId;
Review comment:
Would package-private suffice? (Same for all similar changes in this and
next commit).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -210,7 +210,7 @@
/**
Review comment:
Commit message should include a bit more detail, so that someone who
doesn't know FLINK-15670 by heart knows what's going on.
For example
```
[FLINK-15670][connector] Adds the producer for KafkaShuffle.
KafkaShuffle provides a transparent Kafka source and sink pair, through
which the network traffic of one shuffle step is redirected.
```
##########
File path: flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
##########
@@ -108,6 +109,27 @@ public static boolean getBoolean(Properties config, String
key, boolean defaultV
}
}
+ /**
+ * Flatten a recursive {@link Properties} to a first level property map.
+ * In some cases, {KafkaProducer#propsToMap} for example, Properties is
used purely as a HashTable
Review comment:
its not a proper link, but I guess it's also not possible because of the
redirection. Maybe just use` {@code KafkaProducer#propsToMap}`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN>
{
+ private final KafkaSerializer<IN> kafkaSerializer;
+ private final KeySelector<IN, KEY> keySelector;
+ private final int numberOfPartitions;
+
+ FlinkKafkaShuffleProducer(
+ String defaultTopicId,
+ TypeInformationSerializationSchema<IN> schema,
+ Properties props,
+ KeySelector<IN, KEY> keySelector,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(defaultTopicId, (element, timestamp) -> null, props,
semantic, kafkaProducersPoolSize);
+
+ this.kafkaSerializer = new
KafkaSerializer<>(schema.getSerializer());
+ this.keySelector = keySelector;
+
+ Preconditions.checkArgument(
+ props.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ numberOfPartitions = PropertiesUtil.getInt(props,
PARTITION_NUMBER, Integer.MIN_VALUE);
+ }
+
+ /**
+ * This is the function invoked to handle each element.
+ * @param transaction transaction state;
+ * elements are written to Kafka in transactions to
guarantee different level of data consistency
+ * @param next element to handle
+ * @param context context needed to handle the element
+ * @throws FlinkKafkaException for kafka error
+ */
+ @Override
+ public void invoke(KafkaTransactionState transaction, IN next, Context
context) throws FlinkKafkaException {
+ checkErroneous();
+
+ // write timestamp to Kafka if timestamp is available
+ Long timestamp = context.timestamp();
+
+ int[] partitions = getPartitions(transaction);
+ int partitionIndex;
+ try {
+ partitionIndex = KeyGroupRangeAssignment
+
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length,
partitions.length);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to assign a partition
number to record");
Review comment:
Please use exception chaining:
`throw new RuntimeException("Fail to assign a partition number to record",
e)`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN>
{
+ private final KafkaSerializer<IN> kafkaSerializer;
+ private final KeySelector<IN, KEY> keySelector;
+ private final int numberOfPartitions;
+
+ FlinkKafkaShuffleProducer(
+ String defaultTopicId,
+ TypeInformationSerializationSchema<IN> schema,
+ Properties props,
+ KeySelector<IN, KEY> keySelector,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(defaultTopicId, (element, timestamp) -> null, props,
semantic, kafkaProducersPoolSize);
+
+ this.kafkaSerializer = new
KafkaSerializer<>(schema.getSerializer());
+ this.keySelector = keySelector;
+
+ Preconditions.checkArgument(
+ props.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ numberOfPartitions = PropertiesUtil.getInt(props,
PARTITION_NUMBER, Integer.MIN_VALUE);
+ }
+
+ /**
+ * This is the function invoked to handle each element.
+ * @param transaction transaction state;
+ * elements are written to Kafka in transactions to
guarantee different level of data consistency
+ * @param next element to handle
+ * @param context context needed to handle the element
+ * @throws FlinkKafkaException for kafka error
+ */
+ @Override
+ public void invoke(KafkaTransactionState transaction, IN next, Context
context) throws FlinkKafkaException {
+ checkErroneous();
+
+ // write timestamp to Kafka if timestamp is available
+ Long timestamp = context.timestamp();
+
+ int[] partitions = getPartitions(transaction);
+ int partitionIndex;
+ try {
+ partitionIndex = KeyGroupRangeAssignment
+
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length,
partitions.length);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to assign a partition
number to record");
+ }
+
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ defaultTopicId, partitionIndex, timestamp, null,
kafkaSerializer.serializeRecord(next, timestamp));
+ pendingRecords.incrementAndGet();
+ transaction.getProducer().send(record, callback);
+ }
+
+ /**
+ * This is the function invoked to handle each watermark.
+ * @param watermark watermark to handle
+ * @throws FlinkKafkaException for kafka error
+ */
+ public void invoke(Watermark watermark) throws FlinkKafkaException {
+ checkErroneous();
+ KafkaTransactionState transaction = currentTransaction();
+
+ int[] partitions = getPartitions(transaction);
+ int subtask = getRuntimeContext().getIndexOfThisSubtask();
+
+ // broadcast watermark
+ long timestamp = watermark.getTimestamp();
+ for (int partition : partitions) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(
+ defaultTopicId, partition, timestamp, null,
kafkaSerializer.serializeWatermark(watermark, subtask));
+ pendingRecords.incrementAndGet();
+ transaction.getProducer().send(record, callback);
+ }
+ }
+
+ private int[] getPartitions(KafkaTransactionState transaction) {
+ int[] partitions = topicPartitionsMap.get(defaultTopicId);
+ if (partitions == null) {
+ partitions = getPartitionsByTopic(defaultTopicId,
transaction.getProducer());
+ topicPartitionsMap.put(defaultTopicId, partitions);
+ }
+
+ Preconditions.checkArgument(partitions.length ==
numberOfPartitions);
+
+ return partitions;
+ }
+
+ /**
+ * Flink Kafka Shuffle Serializer.
+ */
+ public static final class KafkaSerializer<IN> implements Serializable {
+ public static final int TAG_REC_WITH_TIMESTAMP = 0;
+ public static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+ public static final int TAG_WATERMARK = 2;
+
+ private final TypeSerializer<IN> serializer;
+
+ private transient DataOutputSerializer dos;
+
+ KafkaSerializer(TypeSerializer<IN> serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * Format: TAG, (timestamp), record.
+ */
+ byte[] serializeRecord(IN record, Long timestamp) {
+ if (dos == null) {
+ dos = new DataOutputSerializer(16);
+ }
+
+ try {
+ if (timestamp == null) {
+ dos.writeInt(TAG_REC_WITHOUT_TIMESTAMP);
Review comment:
Would `writeByte` suffice?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism.
+ *
+ * @param inputStream input stream to the kafka
Review comment:
Description of parameters is usually capitalized to make it easier read
(also in code).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
Review comment:
A see huge duplicates to `KafkaFetcher`. I'm curios why you didn't
choose to subclass in the same way as on producer/consumer.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
+ //
------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's
objects. */
+ private final KafkaShuffleElementDeserializer<T> deserializer;
+
+ /** Serializer to serialize record. */
+ private final TypeSerializer<T> serializer;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread. */
+ private final Handover handover;
+
+ /** The thread that runs the actual KafkaConsumer and hand the record
batches to this fetcher. */
+ private final KafkaConsumerThread consumerThread;
+
+ /** Flag to mark the main work loop as alive. */
+ private volatile boolean running = true;
+
+ public KafkaShuffleFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long>
assignedPartitionsWithInitialOffsets,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>>
watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>>
watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ TypeSerializer<T> serializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics,
+ int producerParallelism) throws Exception {
+ super(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ consumerMetricGroup,
+ useMetrics);
+ this.deserializer = new KafkaShuffleElementDeserializer<>();
+ this.serializer = serializer;
+ this.handover = new Handover();
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ unassignedPartitionsQueue,
+ getFetcherName() + " for " + taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics,
+ consumerMetricGroup,
+ subtaskMetricGroup);
+ this.watermarkHandler = new
WatermarkHandler(producerParallelism);
+ }
+
+ //
------------------------------------------------------------------------
+ // Fetcher work methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ try {
+ final Handover handover = this.handover;
+
+ // kick off the actual Kafka consumer
+ consumerThread.start();
+
+ while (running) {
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions
encountered in the consumer thread
+ final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitionStates()) {
+ List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
+
records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
+ final KafkaShuffleElement<T>
element = deserializer.deserialize(serializer, record);
+
+ // TODO: do we need to check
the end of stream if reaching the end watermark?
+
+ if (element.isRecord()) {
+ // timestamp is
inherent from upstream
+ // If using
ProcessTime, timestamp is going to be ignored (upstream does not include
timestamp as well)
+ // If using
IngestionTime, timestamp is going to be overwritten
+ // If using EventTime,
timestamp is going to be used
+ synchronized
(checkpointLock) {
+
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+
sourceContext.collectWithTimestamp(
+
elementAsRecord.value,
+
elementAsRecord.timestamp == null ? record.timestamp() :
elementAsRecord.timestamp);
+
partition.setOffset(record.offset());
+ }
+ } else if
(element.isWatermark()) {
+ final
KafkaShuffleWatermark watermark = element.asWatermark();
+ Optional<Watermark>
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+
newWatermark.ifPresent(sourceContext::emitWatermark);
+ }
+ }
+ }
+ }
+ }
+ finally {
+ // this signals the consumer thread that no more work
is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an
exception.
+ // we ignore this here and only restore the
interruption state
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come
anyways.
+ running = false;
+ handover.close();
+ consumerThread.shutdown();
+ }
+
+ @Override
+ protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition
partition) {
+ return new TopicPartition(partition.getTopic(),
partition.getPartition());
+ }
+
+ @Override
+ protected void doCommitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws
Exception {
+ @SuppressWarnings("unchecked")
+ List<KafkaTopicPartitionState<TopicPartition>> partitions =
subscribedPartitionStates();
+
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new
HashMap<>(partitions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> partition :
partitions) {
+ Long lastProcessedOffset =
offsets.get(partition.getKafkaTopicPartition());
+ if (lastProcessedOffset != null) {
+ checkState(lastProcessedOffset >= 0, "Illegal
offset value to commit");
+
+ // committed offsets through the KafkaConsumer
need to be 1 more than the last processed offset.
+ // This does not affect Flink's
checkpoints/saved state.
+ long offsetToCommit = lastProcessedOffset + 1;
+
+
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new
OffsetAndMetadata(offsetToCommit));
+ partition.setCommittedOffset(offsetToCommit);
+ }
+ }
+
+ // record the work to be committed by the main consumer thread
and make sure the consumer notices that
+ consumerThread.setOffsetsToCommit(offsetsToCommit,
commitCallback);
+ }
+
+ private String getFetcherName() {
+ return "Kafka Shuffle Fetcher";
+ }
+
+ /**
+ * An element in a KafkaShuffle. Can be a record or a Watermark.
+ */
+ @VisibleForTesting
+ public abstract static class KafkaShuffleElement<T> {
+
+ public boolean isRecord() {
+ return getClass() == KafkaShuffleRecord.class;
+ }
+
+ public boolean isWatermark() {
+ return getClass() == KafkaShuffleWatermark.class;
+ }
+
+ public KafkaShuffleRecord<T> asRecord() {
+ return (KafkaShuffleRecord<T>) this;
+ }
+
+ public KafkaShuffleWatermark asWatermark() {
+ return (KafkaShuffleWatermark) this;
+ }
+ }
+
+ /**
+ * A watermark element in a KafkaShuffle. It includes
+ * - subtask index where the watermark is coming from
+ * - watermark timestamp
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleWatermark<T> extends
KafkaShuffleElement<T> {
+ final int subtask;
+ final long watermark;
+
+ KafkaShuffleWatermark(int subtask, long watermark) {
+ this.subtask = subtask;
+ this.watermark = watermark;
+ }
+
+ public int getSubtask() {
+ return subtask;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+ }
+
+ /**
+ * One value with Type T in a KafkaShuffle. This stores the value and
an optional associated timestamp.
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleRecord<T> extends
KafkaShuffleElement<T> {
+ final T value;
+ final Long timestamp;
+
+ KafkaShuffleRecord(T value) {
+ this.value = value;
+ this.timestamp = null;
+ }
+
+ KafkaShuffleRecord(long timestamp, T value) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ /**
+ * Deserializer for KafkaShuffleElement.
+ * @param <T>
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleElementDeserializer<T> implements
Serializable {
+ private transient DataInputDeserializer dis;
+
+ @VisibleForTesting
+ public KafkaShuffleElementDeserializer() {
+ this.dis = new DataInputDeserializer();
+ }
+
+ @VisibleForTesting
+ public KafkaShuffleElement<T> deserialize(TypeSerializer<T>
serializer, ConsumerRecord<byte[], byte[]> record)
+ throws Exception {
+ byte[] value = record.value();
+ dis.setBuffer(value);
Review comment:
`dis` is not initialized when `KafkaShuffleElementDeserializer` gets
deserialized.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN>
{
+ private final KafkaSerializer<IN> kafkaSerializer;
+ private final KeySelector<IN, KEY> keySelector;
+ private final int numberOfPartitions;
+
+ FlinkKafkaShuffleProducer(
+ String defaultTopicId,
+ TypeInformationSerializationSchema<IN> schema,
+ Properties props,
+ KeySelector<IN, KEY> keySelector,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(defaultTopicId, (element, timestamp) -> null, props,
semantic, kafkaProducersPoolSize);
+
+ this.kafkaSerializer = new
KafkaSerializer<>(schema.getSerializer());
+ this.keySelector = keySelector;
+
+ Preconditions.checkArgument(
+ props.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ numberOfPartitions = PropertiesUtil.getInt(props,
PARTITION_NUMBER, Integer.MIN_VALUE);
+ }
+
+ /**
+ * This is the function invoked to handle each element.
+ * @param transaction transaction state;
+ * elements are written to Kafka in transactions to
guarantee different level of data consistency
+ * @param next element to handle
+ * @param context context needed to handle the element
+ * @throws FlinkKafkaException for kafka error
+ */
+ @Override
+ public void invoke(KafkaTransactionState transaction, IN next, Context
context) throws FlinkKafkaException {
+ checkErroneous();
+
+ // write timestamp to Kafka if timestamp is available
+ Long timestamp = context.timestamp();
+
+ int[] partitions = getPartitions(transaction);
+ int partitionIndex;
+ try {
+ partitionIndex = KeyGroupRangeAssignment
+
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length,
partitions.length);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to assign a partition
number to record");
+ }
+
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ defaultTopicId, partitionIndex, timestamp, null,
kafkaSerializer.serializeRecord(next, timestamp));
Review comment:
If method parameters do not fit on the line of invocation all of them
need to be chopped (including the first one).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN>
{
+ private final KafkaSerializer<IN> kafkaSerializer;
+ private final KeySelector<IN, KEY> keySelector;
+ private final int numberOfPartitions;
+
+ FlinkKafkaShuffleProducer(
+ String defaultTopicId,
+ TypeInformationSerializationSchema<IN> schema,
+ Properties props,
+ KeySelector<IN, KEY> keySelector,
+ Semantic semantic,
+ int kafkaProducersPoolSize) {
+ super(defaultTopicId, (element, timestamp) -> null, props,
semantic, kafkaProducersPoolSize);
+
+ this.kafkaSerializer = new
KafkaSerializer<>(schema.getSerializer());
+ this.keySelector = keySelector;
+
+ Preconditions.checkArgument(
+ props.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ numberOfPartitions = PropertiesUtil.getInt(props,
PARTITION_NUMBER, Integer.MIN_VALUE);
+ }
+
+ /**
+ * This is the function invoked to handle each element.
+ * @param transaction transaction state;
+ * elements are written to Kafka in transactions to
guarantee different level of data consistency
+ * @param next element to handle
+ * @param context context needed to handle the element
+ * @throws FlinkKafkaException for kafka error
+ */
+ @Override
+ public void invoke(KafkaTransactionState transaction, IN next, Context
context) throws FlinkKafkaException {
+ checkErroneous();
+
+ // write timestamp to Kafka if timestamp is available
+ Long timestamp = context.timestamp();
+
+ int[] partitions = getPartitions(transaction);
+ int partitionIndex;
+ try {
+ partitionIndex = KeyGroupRangeAssignment
+
.assignKeyToParallelOperator(keySelector.getKey(next), partitions.length,
partitions.length);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to assign a partition
number to record");
+ }
+
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ defaultTopicId, partitionIndex, timestamp, null,
kafkaSerializer.serializeRecord(next, timestamp));
+ pendingRecords.incrementAndGet();
+ transaction.getProducer().send(record, callback);
+ }
+
+ /**
+ * This is the function invoked to handle each watermark.
+ * @param watermark watermark to handle
+ * @throws FlinkKafkaException for kafka error
+ */
+ public void invoke(Watermark watermark) throws FlinkKafkaException {
+ checkErroneous();
+ KafkaTransactionState transaction = currentTransaction();
+
+ int[] partitions = getPartitions(transaction);
+ int subtask = getRuntimeContext().getIndexOfThisSubtask();
+
+ // broadcast watermark
+ long timestamp = watermark.getTimestamp();
+ for (int partition : partitions) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(
+ defaultTopicId, partition, timestamp, null,
kafkaSerializer.serializeWatermark(watermark, subtask));
Review comment:
chop
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
##########
@@ -73,7 +73,7 @@
/** The lock that guarantees that record emission and state updates are
atomic,
* from the view of taking a checkpoint. */
- private final Object checkpointLock;
+ protected final Object checkpointLock;
Review comment:
package-private?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+
+/**
+ * Flink Kafka Shuffle Producer Function.
+ * It is different from {@link FlinkKafkaProducer} in the way handling
elements and watermarks
+ */
+@Internal
+public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN>
{
+ private final KafkaSerializer<IN> kafkaSerializer;
+ private final KeySelector<IN, KEY> keySelector;
+ private final int numberOfPartitions;
+
+ FlinkKafkaShuffleProducer(
+ String defaultTopicId,
+ TypeInformationSerializationSchema<IN> schema,
Review comment:
Pass serializer directly.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
##########
@@ -126,4 +126,8 @@ public boolean isEndOfStream(T nextElement) {
public TypeInformation<T> getProducedType() {
return typeInfo;
}
+
+ public TypeSerializer<T> getSerializer() {
Review comment:
Unneeded API change (see `KafkaShuffleProducer`).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
Review comment:
comment? just to be symmetric with the other fields (not strictly
needed).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
+ //
------------------------------------------------------------------------
Review comment:
this line does not make any sense.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
Review comment:
Usually, after first sentence (or summary if it's more than one
sentence), we have linebreak and start next paragraph with `<p>`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
Review comment:
Method javadocs usually start in the third form `Writes to ...` see also
`DataStream`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
+ //
------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's
objects. */
+ private final KafkaShuffleElementDeserializer<T> deserializer;
+
+ /** Serializer to serialize record. */
+ private final TypeSerializer<T> serializer;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread. */
+ private final Handover handover;
+
+ /** The thread that runs the actual KafkaConsumer and hand the record
batches to this fetcher. */
+ private final KafkaConsumerThread consumerThread;
+
+ /** Flag to mark the main work loop as alive. */
+ private volatile boolean running = true;
+
+ public KafkaShuffleFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long>
assignedPartitionsWithInitialOffsets,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>>
watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>>
watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ TypeSerializer<T> serializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics,
+ int producerParallelism) throws Exception {
+ super(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ consumerMetricGroup,
+ useMetrics);
+ this.deserializer = new KafkaShuffleElementDeserializer<>();
+ this.serializer = serializer;
+ this.handover = new Handover();
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ unassignedPartitionsQueue,
+ getFetcherName() + " for " + taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics,
+ consumerMetricGroup,
+ subtaskMetricGroup);
+ this.watermarkHandler = new
WatermarkHandler(producerParallelism);
+ }
+
+ //
------------------------------------------------------------------------
+ // Fetcher work methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ try {
+ final Handover handover = this.handover;
+
+ // kick off the actual Kafka consumer
+ consumerThread.start();
+
+ while (running) {
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions
encountered in the consumer thread
+ final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitionStates()) {
+ List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
+
records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
+ final KafkaShuffleElement<T>
element = deserializer.deserialize(serializer, record);
+
+ // TODO: do we need to check
the end of stream if reaching the end watermark?
+
+ if (element.isRecord()) {
+ // timestamp is
inherent from upstream
+ // If using
ProcessTime, timestamp is going to be ignored (upstream does not include
timestamp as well)
+ // If using
IngestionTime, timestamp is going to be overwritten
+ // If using EventTime,
timestamp is going to be used
+ synchronized
(checkpointLock) {
+
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+
sourceContext.collectWithTimestamp(
+
elementAsRecord.value,
+
elementAsRecord.timestamp == null ? record.timestamp() :
elementAsRecord.timestamp);
+
partition.setOffset(record.offset());
+ }
+ } else if
(element.isWatermark()) {
+ final
KafkaShuffleWatermark watermark = element.asWatermark();
+ Optional<Watermark>
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+
newWatermark.ifPresent(sourceContext::emitWatermark);
+ }
Review comment:
This is the only difference to `KafkaFetcher` right? That could be
overridden in an extracted `handleRecord` or so.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleConsumer.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+
+/**
+ * Flink Kafka Shuffle Consumer Function.
+ */
+@Internal
+public class FlinkKafkaShuffleConsumer<T> extends FlinkKafkaConsumer<T> {
+ private final TypeSerializer<T> serializer;
+ private final int producerParallelism;
+
+ FlinkKafkaShuffleConsumer(String topic,
TypeInformationSerializationSchema<T> schema, Properties props) {
Review comment:
Same as in producer. Pass serializer directly.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism.
+ *
+ * @param inputStream input stream to the kafka
+ * @param topic kafka topic
+ * @param producerParallelism parallelism of producer
+ * @param numberOfPartitions number of partitions
+ * @param properties Kafka properties
+ * @param fields key positions from inputStream
+ * @param <T> input type
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> inputStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ inputStream, topic, producerParallelism,
numberOfPartitions, properties, keySelector(inputStream, fields));
Review comment:
chop
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/StreamKafkaShuffleSink.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * A customized {@link StreamOperator} for executing {@link
FlinkKafkaShuffleProducer} that handle
+ * both elements and watermarks. If the shuffle sink is determined to be
useful to other sinks in the future,
+ * we should abstract this operator to data stream api. For now, we keep the
operator this way to avoid
+ * public interface change.
+ */
+@Internal
+class StreamKafkaShuffleSink<IN> extends StreamSink<IN> {
+
+ public StreamKafkaShuffleSink(FlinkKafkaShuffleProducer
flinkKafkaShuffleProducer) {
+ super(flinkKafkaShuffleProducer);
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
Review comment:
this is already done by `super`. Afaik you then also don't need the
change in StreamSink.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleBase {
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(600000L);
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with the default
time characteristic: ProcessingTime
+ */
+
+ @Test
+ public void testSimpleProcessingTime() throws Exception {
+ testKafkaShuffle("test_simple", 200000, ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with time
characteristic: IngestionTime
+ */
+ @Test
+ public void testSimpleIngestionTime() throws Exception {
+ testKafkaShuffle("test_simple", 200000, IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with time
characteristic: EventTime
+ */
+ @Test
+ public void testSimpleEventTime() throws Exception {
+ testKafkaShuffle("test_simple", 100000, EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: ProcessingTime
+ */
+ @Test
+ public void testAssignedToPartitionProcessingTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 300000,
ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: IngestionTime
+ */
+ @Test
+ public void testAssignedToPartitionIngestionTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 300000,
IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: EventTime
+ */
+ @Test
+ public void testAssignedToPartitionEventTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 100000,
EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value serialization and deserialization with time
characteristic: ProcessingTime
+ */
+ @Test
+ public void testSerDeProcessingTime() throws Exception {
+ testRecordSerDe(ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: IngestionTime
+ */
+ @Test
+ public void testSerDeIngestionTime() throws Exception {
+ testRecordSerDe(IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: EventTime
+ */
+ @Test
+ public void testSerDeEventTime() throws Exception {
+ testRecordSerDe(EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: EventTime
+ */
+ @Test
+ public void testWatermarkBroadcasting() throws Exception {
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+ final int numElementsPerProducer = 1000;
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>>
results = testKafkaShuffleProducer(
+ topic("test_watermark_broadcast", EventTime),
+ env,
+ numberOfPartitions,
+ producerParallelism,
+ numElementsPerProducer,
+ EventTime);
+ TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer =
createTypeSerializer(env);
+ KafkaShuffleElementDeserializer<Tuple3<Integer, Long, Integer>>
deserializer = new KafkaShuffleElementDeserializer<>();
+
+ // Records in a single partition are kept in order
+ for (int p = 0; p < numberOfPartitions; p++) {
+ Collection<ConsumerRecord<byte[], byte[]>> records =
results.get(p);
+ Map<Integer, List<KafkaShuffleWatermark>> watermarks =
new HashMap<>();
+
+ for (ConsumerRecord<byte[], byte[]> consumerRecord :
records) {
+ Assert.assertNull(consumerRecord.key());
+ KafkaShuffleElement<Tuple3<Integer, Long,
Integer>> element =
+
deserializer.deserialize(typeSerializer, consumerRecord);
+ if (element.isRecord()) {
+ KafkaShuffleRecord<Tuple3<Integer,
Long, Integer>> record = element.asRecord();
+
Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP +
record.getValue().f0);
+
Assert.assertEquals(record.getTimestamp().longValue(),
record.getValue().f1.longValue());
+ } else if (element.isWatermark()) {
+ KafkaShuffleWatermark watermark =
element.asWatermark();
+
watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+
watermarks.get(watermark.getSubtask()).add(watermark);
+ } else {
+ fail("KafkaShuffleElement is either
record or watermark");
+ }
+ }
+
+ // According to the setting how watermarks are
generated in this ITTest,
+ // every producer task emits a watermark corresponding
to each record + the end-of-event-time watermark.
+ // Hence each producer sub task generates
`numElementsPerProducer + 1` watermarks.
+ // Each producer sub task broadcasts these
`numElementsPerProducer + 1` watermarks to all partitions.
+ // Thus in total, each producer sub task emits
`(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+ // From the consumer side, each partition receives
`(numElementsPerProducer + 1) * producerParallelism` watermarks,
+ // with each producer sub task produces
`numElementsPerProducer + 1` watermarks.
+ // Besides, watermarks from the same producer sub task
should keep in order.
+ for (List<KafkaShuffleWatermark> subTaskWatermarks :
watermarks.values()) {
+ int index = 0;
+ Assert.assertEquals(numElementsPerProducer + 1,
subTaskWatermarks.size());
+ for (KafkaShuffleWatermark watermark :
subTaskWatermarks) {
+ if (index == numElementsPerProducer) {
+ // the last element is the
watermark that signifies end-of-event-time
+
Assert.assertEquals(watermark.getWatermark(),
Watermark.MAX_WATERMARK.getTimestamp());
+ } else {
+
Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1
+ * To test no data is lost or duplicated end-2-end
+ */
+ private void testKafkaShuffle(
+ String prefix, int numElementsPerProducer,
TimeCharacteristic timeCharacteristic) throws Exception {
+ String topic = topic(prefix, timeCharacteristic);
+ final int numberOfPartitions = 1;
+ final int producerParallelism = 1;
+
+ createTestTopic(topic, numberOfPartitions, 1);
+
+ final StreamExecutionEnvironment env =
createEnvironment(producerParallelism, timeCharacteristic);
+ createKafkaShuffle(
+ env, topic, numElementsPerProducer,
producerParallelism, timeCharacteristic, numberOfPartitions)
Review comment:
chop
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
##########
@@ -35,9 +35,9 @@
private final SinkTransformation<T> transformation;
- @SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T>
operator) {
- this.transformation = new
SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator,
inputStream.getExecutionEnvironment().getParallelism());
Review comment:
Unrelated changes, please revert or pull into a separate hotfix if you
deem it necessary (I don't).
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism.
Review comment:
Add description when `producerParallelism` should be !=
`numberOfPartitions` as it's none-trivial to decide.
Also is there any support for just going with default degree of parallelism,
such that it can be changed through configs?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElement;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleElementDeserializer;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleRecord;
+import
org.apache.flink.streaming.connectors.kafka.internal.KafkaShuffleFetcher.KafkaShuffleWatermark;
+import org.apache.flink.util.PropertiesUtil;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.api.TimeCharacteristic.EventTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.IngestionTime;
+import static org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PARTITION_NUMBER;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffle.PRODUCER_PARALLELISM;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple End to End Test for Kafka.
+ */
+public class KafkaShuffleITCase extends KafkaShuffleBase {
+
+ @Rule
+ public final Timeout timeout = Timeout.millis(600000L);
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with the default
time characteristic: ProcessingTime
+ */
+
+ @Test
+ public void testSimpleProcessingTime() throws Exception {
+ testKafkaShuffle("test_simple", 200000, ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with time
characteristic: IngestionTime
+ */
+ @Test
+ public void testSimpleIngestionTime() throws Exception {
+ testKafkaShuffle("test_simple", 200000, IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test no data is lost or duplicated end-2-end with time
characteristic: EventTime
+ */
+ @Test
+ public void testSimpleEventTime() throws Exception {
+ testKafkaShuffle("test_simple", 100000, EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: ProcessingTime
+ */
+ @Test
+ public void testAssignedToPartitionProcessingTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 300000,
ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: IngestionTime
+ */
+ @Test
+ public void testAssignedToPartitionIngestionTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 300000,
IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 2; Kafka Partition # = 3; Consumer
Parallelism = 3.
+ * To test data is partitioned to the right partition with time
characteristic: EventTime
+ */
+ @Test
+ public void testAssignedToPartitionEventTime() throws Exception {
+ testAssignedToPartition("test_assigned_to_partition", 100000,
EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value serialization and deserialization with time
characteristic: ProcessingTime
+ */
+ @Test
+ public void testSerDeProcessingTime() throws Exception {
+ testRecordSerDe(ProcessingTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: IngestionTime
+ */
+ @Test
+ public void testSerDeIngestionTime() throws Exception {
+ testRecordSerDe(IngestionTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: EventTime
+ */
+ @Test
+ public void testSerDeEventTime() throws Exception {
+ testRecordSerDe(EventTime);
+ }
+
+ /**
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1.
+ * To test value and watermark serialization and deserialization with
time characteristic: EventTime
+ */
+ @Test
+ public void testWatermarkBroadcasting() throws Exception {
+ final int numberOfPartitions = 3;
+ final int producerParallelism = 2;
+ final int numElementsPerProducer = 1000;
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>>
results = testKafkaShuffleProducer(
+ topic("test_watermark_broadcast", EventTime),
+ env,
+ numberOfPartitions,
+ producerParallelism,
+ numElementsPerProducer,
+ EventTime);
+ TypeSerializer<Tuple3<Integer, Long, Integer>> typeSerializer =
createTypeSerializer(env);
+ KafkaShuffleElementDeserializer<Tuple3<Integer, Long, Integer>>
deserializer = new KafkaShuffleElementDeserializer<>();
+
+ // Records in a single partition are kept in order
+ for (int p = 0; p < numberOfPartitions; p++) {
+ Collection<ConsumerRecord<byte[], byte[]>> records =
results.get(p);
+ Map<Integer, List<KafkaShuffleWatermark>> watermarks =
new HashMap<>();
+
+ for (ConsumerRecord<byte[], byte[]> consumerRecord :
records) {
+ Assert.assertNull(consumerRecord.key());
+ KafkaShuffleElement<Tuple3<Integer, Long,
Integer>> element =
+
deserializer.deserialize(typeSerializer, consumerRecord);
+ if (element.isRecord()) {
+ KafkaShuffleRecord<Tuple3<Integer,
Long, Integer>> record = element.asRecord();
+
Assert.assertEquals(record.getValue().f1.longValue(), INIT_TIMESTAMP +
record.getValue().f0);
+
Assert.assertEquals(record.getTimestamp().longValue(),
record.getValue().f1.longValue());
+ } else if (element.isWatermark()) {
+ KafkaShuffleWatermark watermark =
element.asWatermark();
+
watermarks.computeIfAbsent(watermark.getSubtask(), k -> new ArrayList<>());
+
watermarks.get(watermark.getSubtask()).add(watermark);
+ } else {
+ fail("KafkaShuffleElement is either
record or watermark");
+ }
+ }
+
+ // According to the setting how watermarks are
generated in this ITTest,
+ // every producer task emits a watermark corresponding
to each record + the end-of-event-time watermark.
+ // Hence each producer sub task generates
`numElementsPerProducer + 1` watermarks.
+ // Each producer sub task broadcasts these
`numElementsPerProducer + 1` watermarks to all partitions.
+ // Thus in total, each producer sub task emits
`(numElementsPerProducer + 1) * numberOfPartitions` watermarks.
+ // From the consumer side, each partition receives
`(numElementsPerProducer + 1) * producerParallelism` watermarks,
+ // with each producer sub task produces
`numElementsPerProducer + 1` watermarks.
+ // Besides, watermarks from the same producer sub task
should keep in order.
+ for (List<KafkaShuffleWatermark> subTaskWatermarks :
watermarks.values()) {
+ int index = 0;
+ Assert.assertEquals(numElementsPerProducer + 1,
subTaskWatermarks.size());
+ for (KafkaShuffleWatermark watermark :
subTaskWatermarks) {
+ if (index == numElementsPerProducer) {
+ // the last element is the
watermark that signifies end-of-event-time
+
Assert.assertEquals(watermark.getWatermark(),
Watermark.MAX_WATERMARK.getTimestamp());
+ } else {
+
Assert.assertEquals(watermark.getWatermark(), INIT_TIMESTAMP + index++);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Schema: (key, timestamp, source instance Id).
+ * Producer Parallelism = 1; Kafka Partition # = 1; Consumer
Parallelism = 1
+ * To test no data is lost or duplicated end-2-end
+ */
+ private void testKafkaShuffle(
+ String prefix, int numElementsPerProducer,
TimeCharacteristic timeCharacteristic) throws Exception {
Review comment:
chop
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaShuffleFetcher.java
##########
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITH_TIMESTAMP;
+import static
org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Fetch data from Kafka for Kafka Shuffle.
+ */
+@Internal
+public class KafkaShuffleFetcher<T> extends AbstractFetcher<T, TopicPartition>
{
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaShuffleFetcher.class);
+
+ private final WatermarkHandler watermarkHandler;
+ //
------------------------------------------------------------------------
+
+ /** The schema to convert between Kafka's byte messages, and Flink's
objects. */
+ private final KafkaShuffleElementDeserializer<T> deserializer;
+
+ /** Serializer to serialize record. */
+ private final TypeSerializer<T> serializer;
+
+ /** The handover of data and exceptions between the consumer thread and
the task thread. */
+ private final Handover handover;
+
+ /** The thread that runs the actual KafkaConsumer and hand the record
batches to this fetcher. */
+ private final KafkaConsumerThread consumerThread;
+
+ /** Flag to mark the main work loop as alive. */
+ private volatile boolean running = true;
+
+ public KafkaShuffleFetcher(
+ SourceFunction.SourceContext<T> sourceContext,
+ Map<KafkaTopicPartition, Long>
assignedPartitionsWithInitialOffsets,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>>
watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>>
watermarksPunctuated,
+ ProcessingTimeService processingTimeProvider,
+ long autoWatermarkInterval,
+ ClassLoader userCodeClassLoader,
+ String taskNameWithSubtasks,
+ TypeSerializer<T> serializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ MetricGroup subtaskMetricGroup,
+ MetricGroup consumerMetricGroup,
+ boolean useMetrics,
+ int producerParallelism) throws Exception {
+ super(
+ sourceContext,
+ assignedPartitionsWithInitialOffsets,
+ watermarksPeriodic,
+ watermarksPunctuated,
+ processingTimeProvider,
+ autoWatermarkInterval,
+ userCodeClassLoader,
+ consumerMetricGroup,
+ useMetrics);
+ this.deserializer = new KafkaShuffleElementDeserializer<>();
+ this.serializer = serializer;
+ this.handover = new Handover();
+ this.consumerThread = new KafkaConsumerThread(
+ LOG,
+ handover,
+ kafkaProperties,
+ unassignedPartitionsQueue,
+ getFetcherName() + " for " + taskNameWithSubtasks,
+ pollTimeout,
+ useMetrics,
+ consumerMetricGroup,
+ subtaskMetricGroup);
+ this.watermarkHandler = new
WatermarkHandler(producerParallelism);
+ }
+
+ //
------------------------------------------------------------------------
+ // Fetcher work methods
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void runFetchLoop() throws Exception {
+ try {
+ final Handover handover = this.handover;
+
+ // kick off the actual Kafka consumer
+ consumerThread.start();
+
+ while (running) {
+ // this blocks until we get the next records
+ // it automatically re-throws exceptions
encountered in the consumer thread
+ final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
+
+ // get the records for each topic partition
+ for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitionStates()) {
+ List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
+
records.records(partition.getKafkaPartitionHandle());
+
+ for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
+ final KafkaShuffleElement<T>
element = deserializer.deserialize(serializer, record);
+
+ // TODO: do we need to check
the end of stream if reaching the end watermark?
+
+ if (element.isRecord()) {
+ // timestamp is
inherent from upstream
+ // If using
ProcessTime, timestamp is going to be ignored (upstream does not include
timestamp as well)
+ // If using
IngestionTime, timestamp is going to be overwritten
+ // If using EventTime,
timestamp is going to be used
+ synchronized
(checkpointLock) {
+
KafkaShuffleRecord<T> elementAsRecord = element.asRecord();
+
sourceContext.collectWithTimestamp(
+
elementAsRecord.value,
+
elementAsRecord.timestamp == null ? record.timestamp() :
elementAsRecord.timestamp);
+
partition.setOffset(record.offset());
+ }
+ } else if
(element.isWatermark()) {
+ final
KafkaShuffleWatermark watermark = element.asWatermark();
+ Optional<Watermark>
newWatermark = watermarkHandler.checkAndGetNewWatermark(watermark);
+
newWatermark.ifPresent(sourceContext::emitWatermark);
+ }
+ }
+ }
+ }
+ }
+ finally {
+ // this signals the consumer thread that no more work
is to be done
+ consumerThread.shutdown();
+ }
+
+ // on a clean exit, wait for the runner thread
+ try {
+ consumerThread.join();
+ }
+ catch (InterruptedException e) {
+ // may be the result of a wake-up interruption after an
exception.
+ // we ignore this here and only restore the
interruption state
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // flag the main thread to exit. A thread interrupt will come
anyways.
+ running = false;
+ handover.close();
+ consumerThread.shutdown();
+ }
+
+ @Override
+ protected TopicPartition createKafkaPartitionHandle(KafkaTopicPartition
partition) {
+ return new TopicPartition(partition.getTopic(),
partition.getPartition());
+ }
+
+ @Override
+ protected void doCommitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws
Exception {
+ @SuppressWarnings("unchecked")
+ List<KafkaTopicPartitionState<TopicPartition>> partitions =
subscribedPartitionStates();
+
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new
HashMap<>(partitions.size());
+
+ for (KafkaTopicPartitionState<TopicPartition> partition :
partitions) {
+ Long lastProcessedOffset =
offsets.get(partition.getKafkaTopicPartition());
+ if (lastProcessedOffset != null) {
+ checkState(lastProcessedOffset >= 0, "Illegal
offset value to commit");
+
+ // committed offsets through the KafkaConsumer
need to be 1 more than the last processed offset.
+ // This does not affect Flink's
checkpoints/saved state.
+ long offsetToCommit = lastProcessedOffset + 1;
+
+
offsetsToCommit.put(partition.getKafkaPartitionHandle(), new
OffsetAndMetadata(offsetToCommit));
+ partition.setCommittedOffset(offsetToCommit);
+ }
+ }
+
+ // record the work to be committed by the main consumer thread
and make sure the consumer notices that
+ consumerThread.setOffsetsToCommit(offsetsToCommit,
commitCallback);
+ }
+
+ private String getFetcherName() {
+ return "Kafka Shuffle Fetcher";
+ }
+
+ /**
+ * An element in a KafkaShuffle. Can be a record or a Watermark.
+ */
+ @VisibleForTesting
+ public abstract static class KafkaShuffleElement<T> {
+
+ public boolean isRecord() {
+ return getClass() == KafkaShuffleRecord.class;
+ }
+
+ public boolean isWatermark() {
+ return getClass() == KafkaShuffleWatermark.class;
+ }
+
+ public KafkaShuffleRecord<T> asRecord() {
+ return (KafkaShuffleRecord<T>) this;
+ }
+
+ public KafkaShuffleWatermark asWatermark() {
+ return (KafkaShuffleWatermark) this;
+ }
+ }
+
+ /**
+ * A watermark element in a KafkaShuffle. It includes
+ * - subtask index where the watermark is coming from
+ * - watermark timestamp
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleWatermark<T> extends
KafkaShuffleElement<T> {
+ final int subtask;
+ final long watermark;
+
+ KafkaShuffleWatermark(int subtask, long watermark) {
+ this.subtask = subtask;
+ this.watermark = watermark;
+ }
+
+ public int getSubtask() {
+ return subtask;
+ }
+
+ public long getWatermark() {
+ return watermark;
+ }
+ }
+
+ /**
+ * One value with Type T in a KafkaShuffle. This stores the value and
an optional associated timestamp.
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleRecord<T> extends
KafkaShuffleElement<T> {
+ final T value;
+ final Long timestamp;
+
+ KafkaShuffleRecord(T value) {
+ this.value = value;
+ this.timestamp = null;
+ }
+
+ KafkaShuffleRecord(long timestamp, T value) {
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+ }
+
+ /**
+ * Deserializer for KafkaShuffleElement.
+ * @param <T>
+ */
+ @VisibleForTesting
+ public static class KafkaShuffleElementDeserializer<T> implements
Serializable {
+ private transient DataInputDeserializer dis;
+
+ @VisibleForTesting
+ public KafkaShuffleElementDeserializer() {
+ this.dis = new DataInputDeserializer();
+ }
+
+ @VisibleForTesting
+ public KafkaShuffleElement<T> deserialize(TypeSerializer<T>
serializer, ConsumerRecord<byte[], byte[]> record)
+ throws Exception {
+ byte[] value = record.value();
+ dis.setBuffer(value);
+ int tag = IntSerializer.INSTANCE.deserialize(dis);
+
+ if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+ return new
KafkaShuffleRecord<>(serializer.deserialize(dis));
+ } else if (tag == TAG_REC_WITH_TIMESTAMP) {
+ return new
KafkaShuffleRecord<>(LongSerializer.INSTANCE.deserialize(dis),
serializer.deserialize(dis));
+ } else if (tag == TAG_WATERMARK) {
+ return new KafkaShuffleWatermark<>(
+
IntSerializer.INSTANCE.deserialize(dis),
LongSerializer.INSTANCE.deserialize(dis));
+ }
+
+ throw new UnsupportedOperationException("Unsupported
tag format");
+ }
+ }
+
+ /**
+ * WatermarkHandler to generate watermarks.
+ */
+ private static class WatermarkHandler {
+ private final int producerParallelism;
+ private final Map<Integer, Long> subtaskWatermark;
+
+ private long currentMinWatermark = Long.MIN_VALUE;
+
+ WatermarkHandler(int numberOfSubtask) {
Review comment:
param name is very confusing. Should also be `producerParallelism`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.shuffle;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.operators.Keys;
+import
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.transformations.SinkTransformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.PropertiesUtil;
+
+import java.util.Properties;
+
+/**
+ * Use Kafka as a persistent shuffle by wrapping a Kafka Source/Sink pair
together.
+ */
+@Experimental
+class FlinkKafkaShuffle {
+ static final String PRODUCER_PARALLELISM = "producer parallelism";
+ static final String PARTITION_NUMBER = "partition number";
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism.
+ *
+ * @param inputStream input stream to the kafka
+ * @param topic kafka topic
+ * @param producerParallelism parallelism of producer
+ * @param numberOfPartitions number of partitions
+ * @param properties Kafka properties
+ * @param fields key positions from inputStream
+ * @param <T> input type
+ */
+ public static <T> KeyedStream<T, Tuple> persistentKeyBy(
+ DataStream<T> inputStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ int... fields) {
+ return persistentKeyBy(
+ inputStream, topic, producerParallelism,
numberOfPartitions, properties, keySelector(inputStream, fields));
+ }
+
+ /**
+ * Write to and read from a kafka shuffle with the partition decided by
keys.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism.
+ *
+ * @param inputStream input stream to the kafka
+ * @param topic kafka topic
+ * @param producerParallelism parallelism of producer
+ * @param numberOfPartitions number of partitions
+ * @param properties Kafka properties
+ * @param keySelector key(K) based on inputStream(T)
+ * @param <T> input type
+ * @param <K> key type
+ */
+ public static <T, K> KeyedStream<T, K> persistentKeyBy(
+ DataStream<T> inputStream,
+ String topic,
+ int producerParallelism,
+ int numberOfPartitions,
+ Properties properties,
+ KeySelector<T, K> keySelector) {
+ // KafkaProducer#propsToMap uses Properties purely as a HashMap
without considering the default properties
+ // So we have to flatten the default property to first level
elements.
+ Properties kafkaProperties = PropertiesUtil.flatten(properties);
+ kafkaProperties.setProperty(PRODUCER_PARALLELISM,
String.valueOf(producerParallelism));
+ kafkaProperties.setProperty(PARTITION_NUMBER,
String.valueOf(numberOfPartitions));
+
+ StreamExecutionEnvironment env =
inputStream.getExecutionEnvironment();
+ TypeInformationSerializationSchema<T> schema =
+ new
TypeInformationSerializationSchema<>(inputStream.getType(), env.getConfig());
+
+ writeKeyBy(inputStream, topic, kafkaProperties, keySelector);
+ return readKeyBy(topic, env, schema, kafkaProperties,
keySelector);
+ }
+
+ /**
+ * Write to a kafka shuffle with the partition decided by keys.
+ * @param inputStream input stream to the kafka
+ * @param topic kafka topic
+ * @param kafkaProperties kafka properties
+ * @param fields key positions from inputStream
+ * @param <T> input type
+ */
+ public static <T> void writeKeyBy(
+ DataStream<T> inputStream,
+ String topic,
+ Properties kafkaProperties,
+ int... fields) {
+ writeKeyBy(inputStream, topic, kafkaProperties,
keySelector(inputStream, fields));
+ }
+
+ /**
+ * Write to a kafka shuffle with the partition decided by keys.
+ * @param inputStream input stream to the kafka
+ * @param topic kafka topic
+ * @param kafkaProperties kafka properties
+ * @param keySelector key(K) based on input(T)
+ * @param <T> input type
+ * @param <K> key type
+ */
+ public static <T, K> void writeKeyBy(
+ DataStream<T> inputStream,
+ String topic,
+ Properties kafkaProperties,
+ KeySelector<T, K> keySelector) {
+ StreamExecutionEnvironment env =
inputStream.getExecutionEnvironment();
+ TypeInformationSerializationSchema<T> schema =
+ new
TypeInformationSerializationSchema<>(inputStream.getType(), env.getConfig());
+
+ // write data to Kafka
+ FlinkKafkaShuffleProducer<T, K> kafkaProducer = new
FlinkKafkaShuffleProducer<>(
+ topic,
+ schema,
+ kafkaProperties,
+ env.clean(keySelector),
+ FlinkKafkaProducer.Semantic.EXACTLY_ONCE,
+ FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+
+ // make sure the sink parallelism is set to producerParallelism
+ Preconditions.checkArgument(
+ kafkaProperties.getProperty(PRODUCER_PARALLELISM) !=
null,
+ "Missing producer parallelism for Kafka Shuffle");
+ int producerParallelism =
PropertiesUtil.getInt(kafkaProperties, PRODUCER_PARALLELISM, Integer.MIN_VALUE);
+
+ addKafkaShuffle(inputStream, kafkaProducer,
producerParallelism);
+ }
+
+ /**
+ * Read data from a Kafka Shuffle.
+ * Consumers should read partitions equal to the key group indices they
are assigned.
+ * The number of partitions is the maximum parallelism of the receiving
operator.
+ * This version only supports numberOfPartitions = consumerParallelism
+ *
+ * @param topic kafka topic
+ * @param env streaming execution environment. readKeyBy environment
can be different from writeKeyBy
+ * @param schema the record schema to read
+ * @param kafkaProperties kafka properties
+ * @param keySelector key(K) based on schema(T)
+ * @param <T> schema type
+ * @param <K> key type
+ * @return keyed data stream
+ */
+ public static <T, K> KeyedStream<T, K> readKeyBy(
+ String topic,
+ StreamExecutionEnvironment env,
+ TypeInformationSerializationSchema<T> schema,
+ Properties kafkaProperties,
+ KeySelector<T, K> keySelector) {
+ SourceFunction<T> kafkaConsumer = new
FlinkKafkaShuffleConsumer<>(topic, schema, kafkaProperties);
+
+ // TODO: consider situations where numberOfPartitions !=
consumerParallelism
+ Preconditions.checkArgument(
+ kafkaProperties.getProperty(PARTITION_NUMBER) != null,
+ "Missing partition number for Kafka Shuffle");
+ int numberOfPartitions = PropertiesUtil.getInt(kafkaProperties,
PARTITION_NUMBER, Integer.MIN_VALUE);
+ DataStream<T> outputDataStream =
env.addSource(kafkaConsumer).setParallelism(numberOfPartitions);
+
+ return
DataStreamUtils.reinterpretAsKeyedStream(outputDataStream, keySelector);
+ }
+
+ /**
+ * Add a {@link StreamKafkaShuffleSink} to {@link DataStream}.
+ * {@link StreamKafkaShuffleSink} is associated a {@link
FlinkKafkaShuffleProducer}.
+ *
+ * @param inputStream the input data stream connected to the shuffle
+ * @param kafkaShuffleProducer kafka shuffle sink function that can
handle both records and watermark
+ * @param producerParallelism the number of tasks writing to the kafka
shuffle
+ */
+ private static <T, K> void addKafkaShuffle(
+ DataStream<T> inputStream, FlinkKafkaShuffleProducer<T,
K> kafkaShuffleProducer, int producerParallelism) {
+
+ // read the output type of the input Transform to coax out
errors about MissingTypeInfo
+ inputStream.getTransformation().getOutputType();
+
+ StreamKafkaShuffleSink<T> shuffleSinkOperator = new
StreamKafkaShuffleSink<>(kafkaShuffleProducer);
+ SinkTransformation<T> transformation = new SinkTransformation<>(
+ inputStream.getTransformation(),
+ "kafka_shuffle",
+ shuffleSinkOperator,
+ inputStream.getExecutionEnvironment().getParallelism());
+
inputStream.getExecutionEnvironment().addOperator(transformation);
+ transformation.setParallelism(producerParallelism);
+ }
+
+ // A better place to put this function is DataStream; but put it here
for now to avoid changing DataStream
Review comment:
I wouldn't mind as this only changes internals.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]