This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 57985782d0 [INLONG-10317][Sort] Make Kafka source support report audit
information exactly once (#10550)
57985782d0 is described below
commit 57985782d0a074ad869d8061c3e870b6b18d3928
Author: XiaoYou201 <[email protected]>
AuthorDate: Wed Jul 3 10:10:55 2024 +0800
[INLONG-10317][Sort] Make Kafka source support report audit information
exactly once (#10550)
---
.../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 1 +
.../inlong/sort/kafka/source/KafkaSource.java | 239 +++++++++
.../sort/kafka/source/KafkaSourceBuilder.java | 534 +++++++++++++++++++++
.../kafka/source/reader/KafkaSourceReader.java | 217 +++++++++
.../table/DynamicKafkaDeserializationSchema.java | 81 ++--
.../sort/kafka/table/KafkaDynamicSource.java | 11 +-
licenses/inlong-sort-connectors/LICENSE | 3 +
7 files changed, 1047 insertions(+), 39 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index 019b0f456c..2ecb22476e 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -84,6 +84,7 @@
<include>org.apache.httpcomponents:*</include>
<include>org.apache.commons:commons-lang3</include>
<include>com.google.protobuf:*</include>
+ <include>com.google.guava:*</include>
<include>joda-time:*</include>
<include>com.fasterxml.jackson.core:*</include>
<include>com.amazonaws:*</include>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
new file mode 100644
index 0000000000..5004ec34a3
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.inlong.sort.kafka.source.reader.KafkaSourceReader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+import
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
+import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import
org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The Source implementation of Kafka. Please use a {@link KafkaSourceBuilder}
to construct a {@link
+ * KafkaSource}. The following example shows how to create a KafkaSource
emitting records of <code>
+ * String</code> type.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ * .<String>builder()
+ * .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
+ * .setGroupId("MyGroup")
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ * .setDeserializer(new TestingKafkaRecordDeserializationSchema())
+ * .setStartingOffsets(OffsetsInitializer.earliest())
+ * .build();
+ * }</pre>
+ *
+ * <p>See {@link KafkaSourceBuilder} for more details.
+ *
+ * @param <OUT> the output type of the source.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add a variable metricSchema to report audit information
+ */
+@PublicEvolving
+public class KafkaSource<OUT>
+ implements
+ Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
+ ResultTypeQueryable<OUT> {
+
+ private static final long serialVersionUID = -8755372893283732098L;
+ // Users can choose only one of the following ways to specify the topics
to consume from.
+ private final KafkaSubscriber subscriber;
+ // Users can specify the starting / stopping offset initializer.
+ private final OffsetsInitializer startingOffsetsInitializer;
+ private final OffsetsInitializer stoppingOffsetsInitializer;
+ // Boundedness
+ private final Boundedness boundedness;
+ private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
+ private final KafkaDeserializationSchema<RowData> metricSchema;
+ // The configurations.
+ private final Properties props;
+
+ KafkaSource(
+ KafkaSubscriber subscriber,
+ OffsetsInitializer startingOffsetsInitializer,
+ @Nullable OffsetsInitializer stoppingOffsetsInitializer,
+ Boundedness boundedness,
+ KafkaRecordDeserializationSchema<OUT> deserializationSchema,
+ KafkaDeserializationSchema<RowData> metricSchema,
+ Properties props) {
+ this.subscriber = subscriber;
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ this.boundedness = boundedness;
+ this.deserializationSchema = deserializationSchema;
+ this.metricSchema = metricSchema;
+ this.props = props;
+ }
+
+ /**
+ * Get a kafkaSourceBuilder to build a {@link KafkaSource}.
+ *
+ * @return a Kafka source builder.
+ */
+ public static <OUT> KafkaSourceBuilder<OUT> builder() {
+ return new KafkaSourceBuilder<>();
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return this.boundedness;
+ }
+
+ @Internal
+ @Override
+ public SourceReader<OUT, KafkaPartitionSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return createReader(readerContext, (ignore) -> {
+ });
+ }
+
+ @VisibleForTesting
+ SourceReader<OUT, KafkaPartitionSplit> createReader(
+ SourceReaderContext readerContext, Consumer<Collection<String>>
splitFinishedHook)
+ throws Exception {
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[],
byte[]>>> elementsQueue =
+ new FutureCompletingBlockingQueue<>();
+ deserializationSchema.open(
+ new DeserializationSchema.InitializationContext() {
+
+ @Override
+ public MetricGroup getMetricGroup() {
+ return
readerContext.metricGroup().addGroup("deserializer");
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return readerContext.getUserCodeClassLoader();
+ }
+ });
+ final KafkaSourceReaderMetrics kafkaSourceReaderMetrics =
+ new KafkaSourceReaderMetrics(readerContext.metricGroup());
+
+ Supplier<KafkaPartitionSplitReader> splitReaderSupplier =
+ () -> new KafkaPartitionSplitReader(props, readerContext,
kafkaSourceReaderMetrics);
+ KafkaRecordEmitter<OUT> recordEmitter = new
KafkaRecordEmitter<>(deserializationSchema);
+
+ return new KafkaSourceReader<>(
+ elementsQueue,
+ new KafkaSourceFetcherManager(
+ elementsQueue, splitReaderSupplier::get,
splitFinishedHook),
+ recordEmitter,
+ toConfiguration(props),
+ readerContext,
+ kafkaSourceReaderMetrics,
+ metricSchema);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>
createEnumerator(
+ SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
+ return new KafkaSourceEnumerator(
+ subscriber,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer,
+ props,
+ enumContext,
+ boundedness);
+ }
+
+ @Internal
+ @Override
+ public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>
restoreEnumerator(
+ SplitEnumeratorContext<KafkaPartitionSplit> enumContext,
+ KafkaSourceEnumState checkpoint)
+ throws IOException {
+ return new KafkaSourceEnumerator(
+ subscriber,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer,
+ props,
+ enumContext,
+ boundedness,
+ checkpoint.assignedPartitions());
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer()
{
+ return new KafkaPartitionSplitSerializer();
+ }
+
+ @Internal
+ @Override
+ public SimpleVersionedSerializer<KafkaSourceEnumState>
getEnumeratorCheckpointSerializer() {
+ return new KafkaSourceEnumStateSerializer();
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+ // ----------- private helper methods ---------------
+
+ private Configuration toConfiguration(Properties props) {
+ Configuration config = new Configuration();
+ props.stringPropertyNames().forEach(key -> config.setString(key,
props.getProperty(key)));
+ return config;
+ }
+
+ @VisibleForTesting
+ Configuration getConfiguration() {
+ return toConfiguration(props);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
new file mode 100644
index 0000000000..58bb651b24
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
+import
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The @builder class for {@link KafkaSource} to make it easier for the users
to construct a {@link
+ * KafkaSource}.
+ *
+ * <p>The following example shows the minimum setup to create a KafkaSource
that reads the String
+ * values from a Kafka topic.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ * .<String>builder()
+ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .build();
+ * }</pre>
+ *
+ * <p>The bootstrap servers, topics/partitions to consume, and the record
deserializer are required
+ * fields that must be set.
+ *
+ * <p>To specify the starting offsets of the KafkaSource, one can call {@link
+ * #setStartingOffsets(OffsetsInitializer)}.
+ *
+ * <p>By default the KafkaSource runs in an {@link
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stops until the Flink job is canceled or fails. To let the KafkaSource run
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can
call {@link
+ * #setUnbounded(OffsetsInitializer)}. For example the following KafkaSource
stops after it consumes
+ * up to the latest partition offsets at the point when the Flink started.
+ *
+ * <pre>{@code
+ * KafkaSource<String> source = KafkaSource
+ * .<String>builder()
+ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
+ * .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ * .setUnbounded(OffsetsInitializer.latest())
+ * .build();
+ * }</pre>
+ *
+ * <p>Check the Java docs of each individual methods to learn more about the
settings to build a
+ * KafkaSource.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add a variable metricSchema to report audit information
+ */
+@PublicEvolving
+public class KafkaSourceBuilder<OUT> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSourceBuilder.class);
+ private static final String[] REQUIRED_CONFIGS =
{ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG};
+ // The subscriber specifies the partitions to subscribe to.
+ private KafkaSubscriber subscriber;
+ // Users can specify the starting / stopping offset initializer.
+ private OffsetsInitializer startingOffsetsInitializer;
+ private OffsetsInitializer stoppingOffsetsInitializer;
+ // Boundedness
+ private Boundedness boundedness;
+ private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
+ private KafkaDeserializationSchema<RowData> metricSchema;
+ // The configurations.
+ protected Properties props;
+
+ KafkaSourceBuilder() {
+ this.subscriber = null;
+ this.startingOffsetsInitializer = OffsetsInitializer.earliest();
+ this.stoppingOffsetsInitializer = new NoStoppingOffsetsInitializer();
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.deserializationSchema = null;
+ this.metricSchema = null;
+ this.props = new Properties();
+ }
+
+ /**
+ * Sets the bootstrap servers for the KafkaConsumer of the KafkaSource.
+ *
+ * @param bootstrapServers the bootstrap servers of the Kafka cluster.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setBootstrapServers(String
bootstrapServers) {
+ return setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ }
+
+ /**
+ * Sets the consumer group id of the KafkaSource.
+ *
+ * @param groupId the group id of the KafkaSource.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setGroupId(String groupId) {
+ return setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ }
+
+ /**
+ * Set a list of topics the KafkaSource should consume from. All the
topics in the list should
+ * have existed in the Kafka cluster. Otherwise an exception will be
thrown. To allow some of
+ * the topics to be created lazily, please use {@link
#setTopicPattern(Pattern)} instead.
+ *
+ * @param topics the list of topics to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ */
+ public KafkaSourceBuilder<OUT> setTopics(List<String> topics) {
+ ensureSubscriberIsNull("topics");
+ subscriber = KafkaSubscriber.getTopicListSubscriber(topics);
+ return this;
+ }
+
+ /**
+ * Set a list of topics the KafkaSource should consume from. All the
topics in the list should
+ * have existed in the Kafka cluster. Otherwise an exception will be
thrown. To allow some of
+ * the topics to be created lazily, please use {@link
#setTopicPattern(Pattern)} instead.
+ *
+ * @param topics the list of topics to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)
+ */
+ public KafkaSourceBuilder<OUT> setTopics(String... topics) {
+ return setTopics(Arrays.asList(topics));
+ }
+
+ /**
+ * Set a topic pattern to consume from use the java {@link Pattern}.
+ *
+ * @param topicPattern the pattern of the topic name to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Pattern)
+ */
+ public KafkaSourceBuilder<OUT> setTopicPattern(Pattern topicPattern) {
+ ensureSubscriberIsNull("topic pattern");
+ subscriber = KafkaSubscriber.getTopicPatternSubscriber(topicPattern);
+ return this;
+ }
+
+ /**
+ * Set a set of partitions to consume from.
+ *
+ * @param partitions the set of partitions to consume from.
+ * @return this KafkaSourceBuilder.
+ * @see org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)
+ */
+ public KafkaSourceBuilder<OUT> setPartitions(Set<TopicPartition>
partitions) {
+ ensureSubscriberIsNull("partitions");
+ subscriber = KafkaSubscriber.getPartitionSetSubscriber(partitions);
+ return this;
+ }
+
+ /**
+ * Specify from which offsets the KafkaSource should start consume from by
providing an {@link
+ * OffsetsInitializer}.
+ *
+ * <p>The following {@link OffsetsInitializer}s are commonly used and
provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for
custom behaviors.
+ *
+ * <ul>
+ * <li>{@link OffsetsInitializer#earliest()} - starting from the
earliest offsets. This is
+ * also the default {@link OffsetsInitializer} of the KafkaSource
for starting offsets.
+ * <li>{@link OffsetsInitializer#latest()} - starting from the latest
offsets.
+ * <li>{@link OffsetsInitializer#committedOffsets()} - starting from the
committed offsets of
+ * the consumer group.
+ * <li>{@link
+ *
OffsetsInitializer#committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy)}
+ * - starting from the committed offsets of the consumer group. If
there is no committed
+ * offsets, starting from the offsets specified by the {@link
+ * org.apache.kafka.clients.consumer.OffsetResetStrategy
OffsetResetStrategy}.
+ * <li>{@link OffsetsInitializer#offsets(Map)} - starting from the
specified offsets for each
+ * partition.
+ * <li>{@link OffsetsInitializer#timestamp(long)} - starting from the
specified timestamp for
+ * each partition. Note that the guarantee here is that all the
records in Kafka whose
+ * {@link
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater than
+ * the given starting timestamp will be consumed. However, it is
possible that some
+ * consumer records whose timestamp is smaller than the given
starting timestamp are also
+ * consumed.
+ * </ul>
+ *
+ * @param startingOffsetsInitializer the {@link OffsetsInitializer}
setting the starting offsets
+ * for the Source.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setStartingOffsets(
+ OffsetsInitializer startingOffsetsInitializer) {
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * By default the KafkaSource is set to run in {@link
Boundedness#CONTINUOUS_UNBOUNDED} manner
+ * and thus never stops until the Flink job fails or is canceled. To let
the KafkaSource run as
+ * a streaming source but still stops at some point, one can set an {@link
OffsetsInitializer}
+ * to specify the stopping offsets for each partition. When all the
partitions have reached
+ * their stopping offsets, the KafkaSource will then exit.
+ *
+ * <p>This method is different from {@link
#setBounded(OffsetsInitializer)} that after setting
+ * the stopping offsets with this method, {@link
KafkaSource#getBoundedness()} will still return
+ * {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at
the stopping offsets
+ * specified by the stopping offsets {@link OffsetsInitializer}.
+ *
+ * <p>The following {@link OffsetsInitializer} are commonly used and
provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for
custom behaviors.
+ *
+ * <ul>
+ * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets
of the partitions when
+ * the KafkaSource starts to run.
+ * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the
committed offsets of the
+ * consumer group.
+ * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified
offsets for each
+ * partition.
+ * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the
specified timestamp for each
+ * partition. The guarantee of setting the stopping timestamp is
that no Kafka records
+ * whose {@link
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+ * than the given stopping timestamp will be consumed. However, it
is possible that some
+ * records whose timestamp is smaller than the specified stopping
timestamp are not
+ * consumed.
+ * </ul>
+ *
+ * @param stoppingOffsetsInitializer The {@link OffsetsInitializer} to
specify the stopping
+ * offset.
+ * @return this KafkaSourceBuilder.
+ * @see #setBounded(OffsetsInitializer)
+ */
+ public KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer
stoppingOffsetsInitializer) {
+ this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * By default the KafkaSource is set to run in {@link
Boundedness#CONTINUOUS_UNBOUNDED} manner
+ * and thus never stops until the Flink job fails or is canceled. To let
the KafkaSource run in
+ * {@link Boundedness#BOUNDED} manner and stops at some point, one can set
an {@link
+ * OffsetsInitializer} to specify the stopping offsets for each partition.
When all the
+ * partitions have reached their stopping offsets, the KafkaSource will
then exit.
+ *
+ * <p>This method is different from {@link
#setUnbounded(OffsetsInitializer)} that after setting
+ * the stopping offsets with this method, {@link
KafkaSource#getBoundedness()} will return
+ * {@link Boundedness#BOUNDED} instead of {@link
Boundedness#CONTINUOUS_UNBOUNDED}.
+ *
+ * <p>The following {@link OffsetsInitializer} are commonly used and
provided out of the box.
+ * Users can also implement their own {@link OffsetsInitializer} for
custom behaviors.
+ *
+ * <ul>
+ * <li>{@link OffsetsInitializer#latest()} - stop at the latest offsets
of the partitions when
+ * the KafkaSource starts to run.
+ * <li>{@link OffsetsInitializer#committedOffsets()} - stops at the
committed offsets of the
+ * consumer group.
+ * <li>{@link OffsetsInitializer#offsets(Map)} - stops at the specified
offsets for each
+ * partition.
+ * <li>{@link OffsetsInitializer#timestamp(long)} - stops at the
specified timestamp for each
+ * partition. The guarantee of setting the stopping timestamp is
that no Kafka records
+ * whose {@link
org.apache.kafka.clients.consumer.ConsumerRecord#timestamp()} is greater
+ * than the given stopping timestamp will be consumed. However, it
is possible that some
+ * records whose timestamp is smaller than the specified stopping
timestamp are not
+ * consumed.
+ * </ul>
+ *
+ * @param stoppingOffsetsInitializer the {@link OffsetsInitializer} to
specify the stopping
+ * offsets.
+ * @return this KafkaSourceBuilder.
+ * @see #setUnbounded(OffsetsInitializer)
+ */
+ public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer
stoppingOffsetsInitializer) {
+ this.boundedness = Boundedness.BOUNDED;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
+ return this;
+ }
+
+ /**
+ * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the
{@link
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for
KafkaSource.
+ *
+ * @param recordDeserializer the deserializer for Kafka {@link
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setDeserializer(
+ KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
+ this.deserializationSchema = recordDeserializer;
+ return this;
+ }
+
+ /**
+ * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the
{@link
+ * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for
KafkaSource. The given
+ * {@link DeserializationSchema} will be used to deserialize the value of
ConsumerRecord. The
+ * other information (e.g. key) in a ConsumerRecord will be ignored.
+ *
+ * @param deserializationSchema the {@link DeserializationSchema} to use
for deserialization.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
+ DeserializationSchema<OUT> deserializationSchema) {
+ this.deserializationSchema =
+
KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
+ return this;
+ }
+
+ /**
+ * Sets the client id prefix of this KafkaSource.
+ *
+ * @param prefix the client id prefix to use for this KafkaSource.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setClientIdPrefix(String prefix) {
+ return setProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key(), prefix);
+ }
+
+ /**
+ * Set an arbitrary property for the KafkaSource and KafkaConsumer. The
valid keys can be found
+ * in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
+ *
+ * <p>Note that the following keys will be overridden by the builder when
the KafkaSource is
+ * created.
+ *
+ * <ul>
+ * <li><code>key.deserializer</code> is always set to {@link
ByteArrayDeserializer}.
+ * <li><code>value.deserializer</code> is always set to {@link
ByteArrayDeserializer}.
+ * <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+ * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting
offsets, which is by
+ * default {@link OffsetsInitializer#earliest()}.
+ * <li><code>partition.discovery.interval.ms</code> is overridden to -1
when {@link
+ * #setBounded(OffsetsInitializer)} has been invoked.
+ * </ul>
+ *
+ * @param key the key of the property.
+ * @param value the value of the property.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setProperty(String key, String value) {
+ props.setProperty(key, value);
+ return this;
+ }
+
+ /**
+ * Set arbitrary properties for the KafkaSource and KafkaConsumer. The
valid keys can be found
+ * in {@link ConsumerConfig} and {@link KafkaSourceOptions}.
+ *
+ * <p>Note that the following keys will be overridden by the builder when
the KafkaSource is
+ * created.
+ *
+ * <ul>
+ * <li><code>key.deserializer</code> is always set to {@link
ByteArrayDeserializer}.
+ * <li><code>value.deserializer</code> is always set to {@link
ByteArrayDeserializer}.
+ * <li><code>auto.offset.reset.strategy</code> is overridden by {@link
+ * OffsetsInitializer#getAutoOffsetResetStrategy()} for the starting
offsets, which is by
+ * default {@link OffsetsInitializer#earliest()}.
+ * <li><code>partition.discovery.interval.ms</code> is overridden to -1
when {@link
+ * #setBounded(OffsetsInitializer)} has been invoked.
+ * <li><code>client.id</code> is overridden to the
"client.id.prefix-RANDOM_LONG", or
+ * "group.id-RANDOM_LONG" if the client id prefix is not set.
+ * </ul>
+ *
+ * @param props the properties to set for the KafkaSource.
+ * @return this KafkaSourceBuilder.
+ */
+ public KafkaSourceBuilder<OUT> setProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public KafkaSourceBuilder<OUT>
setMetricSchema(KafkaDeserializationSchema<RowData> metricSchema) {
+ this.metricSchema = metricSchema;
+ return this;
+ }
+
+ /**
+ * Build the {@link KafkaSource}.
+ *
+ * @return a KafkaSource with the settings made for this builder.
+ */
+ public KafkaSource<OUT> build() {
+ sanityCheck();
+ parseAndSetRequiredProperties();
+ return new KafkaSource<>(
+ subscriber,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer,
+ boundedness,
+ deserializationSchema,
+ metricSchema,
+ props);
+ }
+
+ // ------------- private helpers --------------
+
+ private void ensureSubscriberIsNull(String attemptingSubscribeMode) {
+ if (subscriber != null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot use %s for consumption because a %s is
already set for consumption.",
+ attemptingSubscribeMode,
subscriber.getClass().getSimpleName()));
+ }
+ }
+
+ private void parseAndSetRequiredProperties() {
+ maybeOverride(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName(),
+ true);
+ maybeOverride(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName(),
+ true);
+ if (!props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ LOG.warn(
+ "Offset commit on checkpoint is disabled because {} is not
specified",
+ ConsumerConfig.GROUP_ID_CONFIG);
+
maybeOverride(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false",
false);
+ }
+ maybeOverride(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
false);
+ maybeOverride(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+
startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
+ true);
+
+ // If the source is bounded, do not run periodic partition discovery.
+ maybeOverride(
+ KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+ "-1",
+ boundedness == Boundedness.BOUNDED);
+
+ // If the client id prefix is not set, reuse the consumer group id as
the client id prefix,
+ // or generate a random string if consumer group id is not specified.
+ maybeOverride(
+ KafkaSourceOptions.CLIENT_ID_PREFIX.key(),
+ props.containsKey(ConsumerConfig.GROUP_ID_CONFIG)
+ ? props.getProperty(ConsumerConfig.GROUP_ID_CONFIG)
+ : "KafkaSource-" + new Random().nextLong(),
+ false);
+ }
+
+ private boolean maybeOverride(String key, String value, boolean override) {
+ boolean overridden = false;
+ String userValue = props.getProperty(key);
+ if (userValue != null) {
+ if (override) {
+ LOG.warn(
+ String.format(
+ "Property %s is provided but will be
overridden from %s to %s",
+ key, userValue, value));
+ props.setProperty(key, value);
+ overridden = true;
+ }
+ } else {
+ props.setProperty(key, value);
+ }
+ return overridden;
+ }
+
+ private void sanityCheck() {
+ // Check required configs.
+ for (String requiredConfig : REQUIRED_CONFIGS) {
+ checkNotNull(
+ props.getProperty(requiredConfig),
+ String.format("Property %s is required but not provided",
requiredConfig));
+ }
+ // Check required settings.
+ checkNotNull(
+ subscriber,
+ "No subscribe mode is specified, "
+ + "should be one of topics, topic pattern and
partition set.");
+ checkNotNull(deserializationSchema, "Deserialization schema is
required but not provided.");
+ // Check consumer group ID
+ checkState(
+ props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) ||
!offsetCommitEnabledManually(),
+ String.format(
+ "Property %s is required when offset commit is
enabled",
+ ConsumerConfig.GROUP_ID_CONFIG));
+ // Check offsets initializers
+ if (startingOffsetsInitializer instanceof OffsetsInitializerValidator)
{
+ ((OffsetsInitializerValidator)
startingOffsetsInitializer).validate(props);
+ }
+ if (stoppingOffsetsInitializer instanceof OffsetsInitializerValidator)
{
+ ((OffsetsInitializerValidator)
stoppingOffsetsInitializer).validate(props);
+ }
+ }
+
+ private boolean offsetCommitEnabledManually() {
+ boolean autoCommit =
+ props.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ && Boolean.parseBoolean(
+
props.getProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ boolean commitOnCheckpoint =
+
props.containsKey(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key())
+ && Boolean.parseBoolean(
+ props.getProperty(
+
KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key()));
+ return autoCommit || commitOnCheckpoint;
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
new file mode 100644
index 0000000000..4643887c49
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.source.reader;
+
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
+import
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/** The source reader for Kafka partitions.
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * Add some method to make report audit information exactly once
+ * */
+@Internal
+public class KafkaSourceReader<T>
+ extends
+ SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[],
byte[]>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSourceReader.class);
+ // These maps need to be concurrent because it will be accessed by both
the main thread
+ // and the split fetcher thread in the callback.
+ private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>>
offsetsToCommit;
+ private final ConcurrentMap<TopicPartition, OffsetAndMetadata>
offsetsOfFinishedSplits;
+ private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
+ private final boolean commitOffsetsOnCheckpoint;
+ private final KafkaDeserializationSchema<RowData> metricSchema;
+
+ public KafkaSourceReader(
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[],
byte[]>>> elementsQueue,
+ KafkaSourceFetcherManager kafkaSourceFetcherManager,
+ RecordEmitter<ConsumerRecord<byte[], byte[]>, T,
KafkaPartitionSplitState> recordEmitter,
+ Configuration config,
+ SourceReaderContext context,
+ KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
+ KafkaDeserializationSchema<RowData> metricSchema) {
+ super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config,
context);
+ this.offsetsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
+ this.offsetsOfFinishedSplits = new ConcurrentHashMap<>();
+ this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
+ this.commitOffsetsOnCheckpoint =
+ config.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
+ this.metricSchema = metricSchema;
+ if (!commitOffsetsOnCheckpoint) {
+ LOG.warn(
+ "Offset commit on checkpoint is disabled. "
+ + "Consuming offset will not be reported back to
Kafka cluster.");
+ }
+ }
+
+ @Override
+ protected void onSplitFinished(Map<String, KafkaPartitionSplitState>
finishedSplitIds) {
+ finishedSplitIds.forEach(
+ (ignored, splitState) -> {
+ if (splitState.getCurrentOffset() >= 0) {
+ offsetsOfFinishedSplits.put(
+ splitState.getTopicPartition(),
+ new
OffsetAndMetadata(splitState.getCurrentOffset()));
+ }
+ });
+ }
+
+ @Override
+ public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
+ if (metricSchema instanceof DynamicKafkaDeserializationSchema) {
+ ((DynamicKafkaDeserializationSchema)
metricSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId);
+ if (!commitOffsetsOnCheckpoint) {
+ return splits;
+ }
+
+ if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) {
+ offsetsToCommit.put(checkpointId, Collections.emptyMap());
+ } else {
+ Map<TopicPartition, OffsetAndMetadata> offsetsMap =
+ offsetsToCommit.computeIfAbsent(checkpointId, id -> new
HashMap<>());
+ // Put the offsets of the active splits.
+ for (KafkaPartitionSplit split : splits) {
+ // If the checkpoint is triggered before the partition
starting offsets
+ // is retrieved, do not commit the offsets for those
partitions.
+ if (split.getStartingOffset() >= 0) {
+ offsetsMap.put(
+ split.getTopicPartition(),
+ new OffsetAndMetadata(split.getStartingOffset()));
+ }
+ }
+ // Put offsets of all the finished splits.
+ offsetsMap.putAll(offsetsOfFinishedSplits);
+ }
+ return splits;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ LOG.debug("Committing offsets for checkpoint {}", checkpointId);
+ if (!commitOffsetsOnCheckpoint) {
+ flushAudit(checkpointId);
+ return;
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> committedPartitions =
+ offsetsToCommit.get(checkpointId);
+ if (committedPartitions == null) {
+ LOG.debug(
+ "Offsets for checkpoint {} either do not exist or have
already been committed.",
+ checkpointId);
+ flushAudit(checkpointId);
+ return;
+ }
+
+ ((KafkaSourceFetcherManager) splitFetcherManager)
+ .commitOffsets(
+ committedPartitions,
+ (ignored, e) -> {
+ // The offset commit here is needed by the
external monitoring. It won't
+ // break Flink job's correctness if we fail to
commit the offset here.
+ if (e != null) {
+ kafkaSourceReaderMetrics.recordFailedCommit();
+ LOG.warn(
+ "Failed to commit consumer offsets for
checkpoint {}",
+ checkpointId,
+ e);
+ } else {
+ LOG.debug(
+ "Successfully committed offsets for
checkpoint {}",
+ checkpointId);
+
kafkaSourceReaderMetrics.recordSucceededCommit();
+ // If the finished topic partition has been
committed, we remove it
+ // from the offsets of the finished splits map.
+ committedPartitions.forEach(
+ (tp, offset) ->
kafkaSourceReaderMetrics.recordCommittedOffset(
+ tp, offset.offset()));
+ offsetsOfFinishedSplits
+ .entrySet()
+ .removeIf(
+ entry ->
committedPartitions.containsKey(
+ entry.getKey()));
+ while (!offsetsToCommit.isEmpty()
+ && offsetsToCommit.firstKey() <=
checkpointId) {
+
offsetsToCommit.remove(offsetsToCommit.firstKey());
+ }
+ }
+ });
+ flushAudit(checkpointId);
+ }
+
+ private void flushAudit(long checkpointId) {
+ if (metricSchema instanceof DynamicKafkaDeserializationSchema) {
+ DynamicKafkaDeserializationSchema schema =
(DynamicKafkaDeserializationSchema) metricSchema;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
+ }
+ @Override
+ protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit
split) {
+ return new KafkaPartitionSplitState(split);
+ }
+
+ @Override
+ protected KafkaPartitionSplit toSplitType(String splitId,
KafkaPartitionSplitState splitState) {
+ return splitState.toKafkaPartitionSplit();
+ }
+
+ // ------------------------
+
+ @VisibleForTesting
+ SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>>
getOffsetsToCommit() {
+ return offsetsToCommit;
+ }
+
+ @VisibleForTesting
+ int getNumAliveFetchers() {
+ return splitFetcherManager.getNumAliveFetchers();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 4406189081..28de46fb08 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.kafka.table;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,11 +38,13 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import static
org.apache.inlong.sort.kafka.table.KafkaDynamicSource.ReadableMetadata.CONSUME_TIME;
+
/** A specific {KafkaSerializationSchema} for {KafkaDynamicSource}.
* <p>
* Copy from org.apache.flink:flink-connector-kafka:1.15.4
* */
-class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<RowData> {
+public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
@@ -62,9 +64,7 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
private final MetricOption metricOption;
- private SourceMetricData sourceMetricData;
-
- private int consumeTimeIndex = -1;
+ private SourceExactlyMetric sourceExactlyMetric;
DynamicKafkaDeserializationSchema(
int physicalArity,
@@ -76,7 +76,8 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode,
- MetricOption metricOption) {
+ MetricOption metricOption,
+ List<String> metadataKeys) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -92,7 +93,8 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
keyProjection,
valueProjection,
metadataConverters,
- upsertMode);
+ upsertMode,
+ metadataKeys);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
this.metricOption = metricOption;
@@ -105,13 +107,7 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
}
valueDeserialization.open(context);
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
- }
- for (int i = 0; i < outputCollector.metadataConverters.length; i++) {
- if (outputCollector.metadataConverters[i]
-
.equals(KafkaDynamicSource.ReadableMetadata.CONSUME_TIME.converter)) {
- consumeTimeIndex = i;
- }
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}
@@ -121,7 +117,7 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
}
@Override
- public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws
Exception {
+ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) {
throw new IllegalStateException("A collector is required for
deserializing.");
}
@@ -132,10 +128,9 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(),
- sourceMetricData == null ? collector : new
MetricsCollector<>(collector, sourceMetricData));
+ sourceExactlyMetric == null ? collector : new
MetricsCollector<>(collector, sourceExactlyMetric));
return;
}
-
// buffer key(s)
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
@@ -143,10 +138,8 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
- if (sourceMetricData != null) {
- MetricsCollector<RowData> metricsCollector = new
MetricsCollector<>(collector, sourceMetricData);
-
metricsCollector.resetTimestamp(getRecordTime(outputCollector.metadataConverters,
record));
- outputCollector.outputCollector = metricsCollector;
+ if (sourceExactlyMetric != null) {
+ outputCollector.outputCollector = new
MetricsCollector<>(collector, sourceExactlyMetric);
} else {
outputCollector.outputCollector = collector;
}
@@ -159,14 +152,6 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
keyCollector.buffer.clear();
}
- private Long getRecordTime(MetadataConverter[] metadataConverters,
- ConsumerRecord<byte[], byte[]> record) {
- if (consumeTimeIndex == -1) {
- return System.currentTimeMillis();
- }
- return (Long) metadataConverters[consumeTimeIndex].read(record);
- }
-
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
@@ -198,6 +183,24 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
}
}
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
/**
@@ -236,30 +239,34 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
private transient Collector<RowData> outputCollector;
+ private final List<String> metadataKeys;
+
OutputProjectionCollector(
int physicalArity,
int[] keyProjection,
int[] valueProjection,
MetadataConverter[] metadataConverters,
- boolean upsertMode) {
+ boolean upsertMode,
+ List<String> metadataKeys) {
this.physicalArity = physicalArity;
this.keyProjection = keyProjection;
this.valueProjection = valueProjection;
this.metadataConverters = metadataConverters;
this.upsertMode = upsertMode;
+ this.metadataKeys = metadataKeys;
}
@Override
public void collect(RowData physicalValueRow) {
// no key defined
if (keyProjection.length == 0) {
- emitRow(null, (GenericRowData) physicalValueRow);
+ emitRow(null, (GenericRowData) physicalValueRow, metadataKeys);
return;
}
// otherwise emit a value for each key
for (RowData physicalKeyRow : physicalKeyRows) {
- emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
physicalValueRow);
+ emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
physicalValueRow, metadataKeys);
}
}
@@ -270,7 +277,8 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
private void emitRow(
@Nullable GenericRowData physicalKeyRow,
- @Nullable GenericRowData physicalValueRow) {
+ @Nullable GenericRowData physicalValueRow,
+ List<String> metadataKeys) {
final RowKind rowKind;
if (physicalValueRow == null) {
if (upsertMode) {
@@ -300,9 +308,14 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
}
for (int metadataPos = 0; metadataPos < metadataArity;
metadataPos++) {
+ Object metadata =
metadataConverters[metadataPos].read(inputRecord);
producedRow.setField(
physicalArity + metadataPos,
- metadataConverters[metadataPos].read(inputRecord));
+ metadata);
+ if (CONSUME_TIME.key.equals(metadataKeys.get(metadataPos)) &&
+ outputCollector instanceof MetricsCollector) {
+ ((MetricsCollector<RowData>)
outputCollector).resetTimestamp((Long) metadata);
+ }
}
outputCollector.collect(producedRow);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 322e6fc758..9b0b0aff64 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.kafka.table;
import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.kafka.source.KafkaSource;
+import org.apache.inlong.sort.kafka.source.KafkaSourceBuilder;
import
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
import org.apache.inlong.sort.protocol.node.ExtractNode;
@@ -26,8 +28,6 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.connector.kafka.source.KafkaSource;
-import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -442,8 +442,8 @@ public class KafkaDynamicSource
}
kafkaSourceBuilder
.setProperties(properties)
-
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
-
+
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer))
+ .setMetricSchema(kafkaDeserializer);
return kafkaSourceBuilder.build();
}
@@ -502,7 +502,8 @@ public class KafkaDynamicSource
metadataConverters,
producedTypeInfo,
upsertMode,
- metricOption);
+ metricOption,
+ metadataKeys);
}
private @Nullable DeserializationSchema<RowData> createDeserialization(
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index e9b2f0b4cd..e4c4590fd4 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -831,6 +831,9 @@
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSource.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/KafkaSourceBuilder.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/source/reader/KafkaSourceReader.java
Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that
the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE