This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b9c9688fc5 [INLONG-10159][Sort] Kafka connector support audit ID
(#10180)
b9c9688fc5 is described below
commit b9c9688fc5bba2a4d94baa33eefc1f3c99889a4b
Author: XiaoYou201 <[email protected]>
AuthorDate: Thu May 16 13:58:27 2024 +0800
[INLONG-10159][Sort] Kafka connector support audit ID (#10180)
---
.../table/DynamicKafkaDeserializationSchema.java | 289 ++++++++++
.../DynamicKafkaRecordSerializationSchema.java | 200 +++++++
.../kafka/table/KafkaConnectorOptionsUtil.java | 298 +++++-----
.../inlong/sort/kafka/table/KafkaDynamicSink.java | 496 ++++++++++++++++
.../sort/kafka/table/KafkaDynamicSource.java | 637 +++++++++++++++++++++
.../sort/kafka/table/KafkaDynamicTableFactory.java | 80 +--
.../sort/kafka/{ => table}/KafkaOptions.java | 7 +-
.../sort/kafka/table/ReducingUpsertSink.java | 101 ++++
.../sort/kafka/table/ReducingUpsertWriter.java | 200 +++++++
.../table/UpsertKafkaDynamicTableFactory.java | 47 +-
licenses/inlong-sort-connectors/LICENSE | 11 +-
11 files changed, 2151 insertions(+), 215 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
new file mode 100644
index 0000000000..7863321c9e
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A specific {KafkaSerializationSchema} for {KafkaDynamicSource}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+
+ private final DeserializationSchema<RowData> valueDeserialization;
+
+ private final boolean hasMetadata;
+
+ private final BufferingCollector keyCollector;
+
+ private final OutputProjectionCollector outputCollector;
+
+ private final TypeInformation<RowData> producedTypeInfo;
+
+ private final boolean upsertMode;
+
+ private final MetricOption metricOption;
+
+ private SourceMetricData sourceMetricData;
+ DynamicKafkaDeserializationSchema(
+ int physicalArity,
+ @Nullable DeserializationSchema<RowData> keyDeserialization,
+ int[] keyProjection,
+ DeserializationSchema<RowData> valueDeserialization,
+ int[] valueProjection,
+ boolean hasMetadata,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> producedTypeInfo,
+ boolean upsertMode,
+ MetricOption metricOption) {
+ if (upsertMode) {
+ Preconditions.checkArgument(
+ keyDeserialization != null && keyProjection.length > 0,
+ "Key must be set in upsert mode for deserialization
schema.");
+ }
+ this.keyDeserialization = keyDeserialization;
+ this.valueDeserialization = valueDeserialization;
+ this.hasMetadata = hasMetadata;
+ this.keyCollector = new BufferingCollector();
+ this.outputCollector =
+ new OutputProjectionCollector(
+ physicalArity,
+ keyProjection,
+ valueProjection,
+ metadataConverters,
+ upsertMode);
+ this.producedTypeInfo = producedTypeInfo;
+ this.upsertMode = upsertMode;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context)
throws Exception {
+ if (keyDeserialization != null) {
+ keyDeserialization.open(context);
+ }
+ valueDeserialization.open(context);
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws
Exception {
+ throw new IllegalStateException("A collector is required for
deserializing.");
+ }
+
+ @Override
+ public void deserialize(ConsumerRecord<byte[], byte[]> record,
Collector<RowData> collector)
+ throws Exception {
+ // shortcut in case no output projection is required,
+ // also not for a cartesian product with the keys
+ if (keyDeserialization == null && !hasMetadata) {
+ valueDeserialization.deserialize(record.value(),
+ sourceMetricData == null ? collector : new
MetricsCollector<>(collector, sourceMetricData));
+ return;
+ }
+
+ // buffer key(s)
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(record.key(), keyCollector);
+ }
+
+ // project output while emitting values
+ outputCollector.inputRecord = record;
+ outputCollector.physicalKeyRows = keyCollector.buffer;
+ outputCollector.outputCollector =
+ sourceMetricData == null ? collector : new
MetricsCollector<>(collector, sourceMetricData);
+ if (record.value() == null && upsertMode) {
+ // collect tombstone messages in upsert mode by hand
+ outputCollector.collect(null);
+ } else {
+ valueDeserialization.deserialize(record.value(), outputCollector);
+ }
+ keyCollector.buffer.clear();
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedTypeInfo;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ interface MetadataConverter extends Serializable {
+
+ Object read(ConsumerRecord<?, ?> record);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static final class BufferingCollector implements
Collector<RowData>, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<RowData> buffer = new ArrayList<>();
+
+ @Override
+ public void collect(RowData record) {
+ buffer.add(record);
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Emits a row with key, value, and metadata fields.
+ *
+ * <p>The collector is able to handle the following kinds of keys:
+ *
+ * <ul>
+ * <li>No key is used.
+ * <li>A key is used.
+ * <li>The deserialization schema emits multiple keys.
+ * <li>Keys and values have overlapping fields.
+ * <li>Keys are used and value is null.
+ * </ul>
+ */
+ private static final class OutputProjectionCollector
+ implements
+ Collector<RowData>,
+ Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int physicalArity;
+
+ private final int[] keyProjection;
+
+ private final int[] valueProjection;
+
+ private final MetadataConverter[] metadataConverters;
+
+ private final boolean upsertMode;
+
+ private transient ConsumerRecord<?, ?> inputRecord;
+
+ private transient List<RowData> physicalKeyRows;
+
+ private transient Collector<RowData> outputCollector;
+
+ OutputProjectionCollector(
+ int physicalArity,
+ int[] keyProjection,
+ int[] valueProjection,
+ MetadataConverter[] metadataConverters,
+ boolean upsertMode) {
+ this.physicalArity = physicalArity;
+ this.keyProjection = keyProjection;
+ this.valueProjection = valueProjection;
+ this.metadataConverters = metadataConverters;
+ this.upsertMode = upsertMode;
+ }
+
+ @Override
+ public void collect(RowData physicalValueRow) {
+ // no key defined
+ if (keyProjection.length == 0) {
+ emitRow(null, (GenericRowData) physicalValueRow);
+ return;
+ }
+
+ // otherwise emit a value for each key
+ for (RowData physicalKeyRow : physicalKeyRows) {
+ emitRow((GenericRowData) physicalKeyRow, (GenericRowData)
physicalValueRow);
+ }
+ }
+
+ @Override
+ public void close() {
+ // nothing to do
+ }
+
+ private void emitRow(
+ @Nullable GenericRowData physicalKeyRow,
+ @Nullable GenericRowData physicalValueRow) {
+ final RowKind rowKind;
+ if (physicalValueRow == null) {
+ if (upsertMode) {
+ rowKind = RowKind.DELETE;
+ } else {
+ throw new DeserializationException(
+ "Invalid null value received in non-upsert mode.
Could not to set row kind for output record.");
+ }
+ } else {
+ rowKind = physicalValueRow.getRowKind();
+ }
+
+ final int metadataArity = metadataConverters.length;
+ final GenericRowData producedRow =
+ new GenericRowData(rowKind, physicalArity + metadataArity);
+
+ for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+ assert physicalKeyRow != null;
+ producedRow.setField(keyProjection[keyPos],
physicalKeyRow.getField(keyPos));
+ }
+
+ if (physicalValueRow != null) {
+ for (int valuePos = 0; valuePos < valueProjection.length;
valuePos++) {
+ producedRow.setField(
+ valueProjection[valuePos],
physicalValueRow.getField(valuePos));
+ }
+ }
+
+ for (int metadataPos = 0; metadataPos < metadataArity;
metadataPos++) {
+ producedRow.setField(
+ physicalArity + metadataPos,
+ metadataConverters[metadataPos].read(inputRecord));
+ }
+ outputCollector.collect(producedRow);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
new file mode 100644
index 0000000000..f1966dd33d
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
+/** SerializationSchema used by {@link KafkaDynamicSink} to configure a
{KafkaSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+class DynamicKafkaRecordSerializationSchema implements
KafkaRecordSerializationSchema<RowData> {
+
+ private final String topic;
+ private final FlinkKafkaPartitioner<RowData> partitioner;
+ @Nullable
+ private final SerializationSchema<RowData> keySerialization;
+ private final SerializationSchema<RowData> valueSerialization;
+ private final RowData.FieldGetter[] keyFieldGetters;
+ private final RowData.FieldGetter[] valueFieldGetters;
+ private final boolean hasMetadata;
+ private final int[] metadataPositions;
+ private final boolean upsertMode;
+ private final MetricOption metricOption;
+ private SinkMetricData sinkMetricData;
+
+ DynamicKafkaRecordSerializationSchema(
+ String topic,
+ @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+ @Nullable SerializationSchema<RowData> keySerialization,
+ SerializationSchema<RowData> valueSerialization,
+ RowData.FieldGetter[] keyFieldGetters,
+ RowData.FieldGetter[] valueFieldGetters,
+ boolean hasMetadata,
+ int[] metadataPositions,
+ boolean upsertMode,
+ MetricOption metricOption) {
+ if (upsertMode) {
+ Preconditions.checkArgument(
+ keySerialization != null && keyFieldGetters.length > 0,
+ "Key must be set in upsert mode for serialization
schema.");
+ }
+ this.topic = checkNotNull(topic);
+ this.partitioner = partitioner;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = checkNotNull(valueSerialization);
+ this.keyFieldGetters = keyFieldGetters;
+ this.valueFieldGetters = valueFieldGetters;
+ this.hasMetadata = hasMetadata;
+ this.metadataPositions = metadataPositions;
+ this.upsertMode = upsertMode;
+ this.metricOption = metricOption;
+
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+ // shortcut in case no input projection is required
+ if (keySerialization == null && !hasMetadata) {
+ final byte[] valueSerialized =
valueSerialization.serialize(consumedRow);
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ topic,
+ extractPartition(
+ consumedRow,
+ null,
+ valueSerialized,
+ context.getPartitionsForTopic(topic)),
+ null,
+ valueSerialized);
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(1, getDataSize(record));
+ }
+ return record;
+ }
+ final byte[] keySerialized;
+ if (keySerialization == null) {
+ keySerialized = null;
+ } else {
+ final RowData keyRow = createProjectedRow(consumedRow,
RowKind.INSERT, keyFieldGetters);
+ keySerialized = keySerialization.serialize(keyRow);
+ }
+
+ final byte[] valueSerialized;
+ final RowKind kind = consumedRow.getRowKind();
+ if (upsertMode) {
+ if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
+ // transform the message as the tombstone message
+ valueSerialized = null;
+ } else {
+ // make the message to be INSERT to be compliant with the
INSERT-ONLY format
+ final RowData valueRow =
+
DynamicKafkaRecordSerializationSchema.createProjectedRow(
+ consumedRow, kind, valueFieldGetters);
+ valueRow.setRowKind(RowKind.INSERT);
+ valueSerialized = valueSerialization.serialize(valueRow);
+ }
+ } else {
+ final RowData valueRow =
+ DynamicKafkaRecordSerializationSchema.createProjectedRow(
+ consumedRow, kind, valueFieldGetters);
+ valueSerialized = valueSerialization.serialize(valueRow);
+ }
+
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+ topic,
+ extractPartition(
+ consumedRow,
+ keySerialized,
+ valueSerialized,
+ context.getPartitionsForTopic(topic)),
+ readMetadata(consumedRow,
KafkaDynamicSink.WritableMetadata.TIMESTAMP),
+ keySerialized,
+ valueSerialized,
+ readMetadata(consumedRow,
KafkaDynamicSink.WritableMetadata.HEADERS));
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(1, getDataSize(record));
+ }
+ return record;
+ }
+
+ @Override
+ public void open(
+ SerializationSchema.InitializationContext context,
KafkaSinkContext sinkContext)
+ throws Exception {
+ if (keySerialization != null) {
+ keySerialization.open(context);
+ }
+ if (partitioner != null) {
+ partitioner.open(
+ sinkContext.getParallelInstanceId(),
+ sinkContext.getNumberOfParallelInstances());
+ }
+ valueSerialization.open(context);
+
+ if (metricOption != null) {
+ this.sinkMetricData = new SinkMetricData(metricOption,
context.getMetricGroup());
+ }
+ }
+
+ private Integer extractPartition(
+ RowData consumedRow,
+ @Nullable byte[] keySerialized,
+ byte[] valueSerialized,
+ int[] partitions) {
+ if (partitioner != null) {
+ return partitioner.partition(
+ consumedRow, keySerialized, valueSerialized, topic,
partitions);
+ }
+ return null;
+ }
+
+ static RowData createProjectedRow(
+ RowData consumedRow, RowKind kind, RowData.FieldGetter[]
fieldGetters) {
+ final int arity = fieldGetters.length;
+ final GenericRowData genericRowData = new GenericRowData(kind, arity);
+ for (int fieldPos = 0; fieldPos < arity; fieldPos++) {
+ genericRowData.setField(fieldPos,
fieldGetters[fieldPos].getFieldOrNull(consumedRow));
+ }
+ return genericRowData;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T readMetadata(RowData consumedRow,
KafkaDynamicSink.WritableMetadata metadata) {
+ final int pos = metadataPositions[metadata.ordinal()];
+ if (pos < 0) {
+ return null;
+ }
+ return (T) metadata.converter.read(consumedRow, pos);
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
index 06cf40ea49..27e9cdb062 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
@@ -27,7 +27,6 @@ import
org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
@@ -50,22 +49,13 @@ import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-/** Utilities for {@link KafkaConnectorOptions}. */
+/** Utilities for { KafkaConnectorOptions}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
@Internal
class KafkaConnectorOptionsUtil {
@@ -76,14 +66,14 @@ class KafkaConnectorOptionsUtil {
// Option enumerations
//
--------------------------------------------------------------------------------------------
- // Prefix for Kafka specific properties.
- public static final String PROPERTIES_PREFIX = "properties.";
-
// Sink partitioner.
public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN =
"round-robin";
+ // Prefix for Kafka specific properties.
+ public static final String PROPERTIES_PREFIX = "properties.";
+
// Other keywords.
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
@@ -152,7 +142,7 @@ class KafkaConnectorOptionsUtil {
"'%s' is required in
'%s' startup mode"
+ " but
missing.",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
-
KafkaConnectorOptions.ScanStartupMode.TIMESTAMP));
+
ScanStartupMode.TIMESTAMP));
}
break;
@@ -165,7 +155,7 @@ class KafkaConnectorOptionsUtil {
"'%s' is required in
'%s' startup mode"
+ " but
missing.",
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
-
KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS));
+
ScanStartupMode.SPECIFIC_OFFSETS));
}
if (!isSingleTopic(tableOptions)) {
throw new ValidationException(
@@ -216,56 +206,6 @@ class KafkaConnectorOptionsUtil {
return tableOptions.getOptional(TOPIC).map(t -> t.size() ==
1).orElse(false);
}
- /**
- * Parses SpecificOffsets String to Map.
- *
- * <p>SpecificOffsets String format was given as following:
- *
- * <pre>
- * scan.startup.specific-offsets =
partition:0,offset:42;partition:1,offset:300
- * </pre>
- *
- * @return SpecificOffsets with Map format, key is partition, and value is
offset
- */
- public static Map<Integer, Long> parseSpecificOffsets(
- String specificOffsetsStr, String optionKey) {
- final Map<Integer, Long> offsetMap = new HashMap<>();
- final String[] pairs = specificOffsetsStr.split(";");
- final String validationExceptionMessage =
- String.format(
- "Invalid properties '%s' should follow the format "
- +
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
- optionKey, specificOffsetsStr);
-
- if (pairs.length == 0) {
- throw new ValidationException(validationExceptionMessage);
- }
-
- for (String pair : pairs) {
- if (null == pair || pair.length() == 0 || !pair.contains(",")) {
- throw new ValidationException(validationExceptionMessage);
- }
-
- final String[] kv = pair.split(",");
- if (kv.length != 2
- || !kv[0].startsWith(PARTITION + ':')
- || !kv[1].startsWith(OFFSET + ':')) {
- throw new ValidationException(validationExceptionMessage);
- }
-
- String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
- String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
- try {
- final Integer partition = Integer.valueOf(partitionValue);
- final Long offset = Long.valueOf(offsetValue);
- offsetMap.put(partition, offset);
- } catch (NumberFormatException e) {
- throw new ValidationException(validationExceptionMessage, e);
- }
- }
- return offsetMap;
- }
-
public static StartupOptions getStartupOptions(ReadableConfig
tableOptions) {
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
final StartupMode startupMode =
@@ -306,9 +246,9 @@ class KafkaConnectorOptionsUtil {
/**
* Returns the {@link StartupMode} of Kafka Consumer by passed-in
table-specific {@link
- * KafkaConnectorOptions.ScanStartupMode}.
+ * ScanStartupMode}.
*/
- private static StartupMode
fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
+ private static StartupMode fromOption(ScanStartupMode scanStartupMode) {
switch (scanStartupMode) {
case EARLIEST_OFFSET:
return StartupMode.EARLIEST;
@@ -327,12 +267,124 @@ class KafkaConnectorOptionsUtil {
}
}
- static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
- if (tableOptions.get(DELIVERY_GUARANTEE) ==
DeliveryGuarantee.EXACTLY_ONCE
- &&
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
+ public static Properties getKafkaProperties(Map<String, String>
tableOptions) {
+ final Properties kafkaProperties = new Properties();
+
+ if (hasKafkaClientProperties(tableOptions)) {
+ tableOptions.keySet().stream()
+ .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+ .forEach(
+ key -> {
+ final String value = tableOptions.get(key);
+ final String subKey =
key.substring((PROPERTIES_PREFIX).length());
+ kafkaProperties.put(subKey, value);
+ });
+ }
+ return kafkaProperties;
+ }
+
+ /**
+ * The partitioner can be either "fixed", "round-robin" or a customized
partitioner full class
+ * name.
+ */
+ public static Optional<FlinkKafkaPartitioner<RowData>>
getFlinkKafkaPartitioner(
+ ReadableConfig tableOptions, ClassLoader classLoader) {
+ return tableOptions
+ .getOptional(SINK_PARTITIONER)
+ .flatMap(
+ (String partitioner) -> {
+ switch (partitioner) {
+ case SINK_PARTITIONER_VALUE_FIXED:
+ return Optional.of(new
FlinkFixedPartitioner<>());
+ case SINK_PARTITIONER_VALUE_DEFAULT:
+ case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+ return Optional.empty();
+ // Default fallback to full class name of the
partitioner.
+ default:
+ return Optional.of(
+ initializePartitioner(partitioner,
classLoader));
+ }
+ });
+ }
+
+ /**
+ * Parses SpecificOffsets String to Map.
+ *
+ * <p>SpecificOffsets String format was given as following:
+ *
+ * <pre>
+ * scan.startup.specific-offsets =
partition:0,offset:42;partition:1,offset:300
+ * </pre>
+ *
+ * @return SpecificOffsets with Map format, key is partition, and value is
offset
+ */
+ public static Map<Integer, Long> parseSpecificOffsets(
+ String specificOffsetsStr, String optionKey) {
+ final Map<Integer, Long> offsetMap = new HashMap<>();
+ final String[] pairs = specificOffsetsStr.split(";");
+ final String validationExceptionMessage =
+ String.format(
+ "Invalid properties '%s' should follow the format "
+ +
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+ optionKey, specificOffsetsStr);
+
+ if (pairs.length == 0) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ for (String pair : pairs) {
+ if (null == pair || !pair.contains(",")) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ final String[] kv = pair.split(",");
+ if (kv.length != 2
+ || !kv[0].startsWith(PARTITION + ':')
+ || !kv[1].startsWith(OFFSET + ':')) {
+ throw new ValidationException(validationExceptionMessage);
+ }
+
+ String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+ String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+ try {
+ final Integer partition = Integer.valueOf(partitionValue);
+ final Long offset = Long.valueOf(offsetValue);
+ offsetMap.put(partition, offset);
+ } catch (NumberFormatException e) {
+ throw new ValidationException(validationExceptionMessage, e);
+ }
+ }
+ return offsetMap;
+ }
+
+ /**
+ * Decides if the table options contains Kafka client properties that
start with prefix
+ * 'properties'.
+ */
+ private static boolean hasKafkaClientProperties(Map<String, String>
tableOptions) {
+ return tableOptions.keySet().stream().anyMatch(k ->
k.startsWith(PROPERTIES_PREFIX));
+ }
+
+ /** Returns a class value with the given class name. */
+ private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
+ String name, ClassLoader classLoader) {
+ try {
+ Class<?> clazz = Class.forName(name, true, classLoader);
+ if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+ throw new ValidationException(
+ String.format(
+ "Sink partitioner class '%s' should extend
from the required class %s",
+ name, FlinkKafkaPartitioner.class.getName()));
+ }
+ @SuppressWarnings("unchecked")
+ final FlinkKafkaPartitioner<T> kafkaPartitioner =
+ InstantiationUtil.instantiate(name,
FlinkKafkaPartitioner.class, classLoader);
+
+ return kafkaPartitioner;
+ } catch (ClassNotFoundException | FlinkException e) {
throw new ValidationException(
- TRANSACTIONAL_ID_PREFIX.key()
- + " must be specified when using
DeliveryGuarantee.EXACTLY_ONCE.");
+ String.format("Could not find and instantiate partitioner
class '%s'", name),
+ e);
}
}
@@ -340,8 +392,8 @@ class KafkaConnectorOptionsUtil {
* Creates an array of indices that determine which physical fields of the
table schema to
* include in the key format and the order that those fields have in the
key format.
*
- * <p>See {@link KafkaConnectorOptions#KEY_FORMAT}, {@link
KafkaConnectorOptions#KEY_FIELDS},
- * and {@link KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more
information.
+ * <p>See {KafkaConnectorOptions#KEY_FORMAT}, {
KafkaConnectorOptions#KEY_FIELDS},
+ * and {KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
*/
public static int[] createKeyFormatProjection(
ReadableConfig options, DataType physicalDataType) {
@@ -406,8 +458,8 @@ class KafkaConnectorOptionsUtil {
* Creates an array of indices that determine which physical fields of the
table schema to
* include in the value format.
*
- * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link
- * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {@link
+ * <p>See {KafkaConnectorOptions#VALUE_FORMAT}, {
+ * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {
* KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
*/
public static int[] createValueFormatProjection(
@@ -420,19 +472,19 @@ class KafkaConnectorOptionsUtil {
final String keyPrefix =
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
- final KafkaConnectorOptions.ValueFieldsStrategy strategy =
options.get(VALUE_FIELDS_INCLUDE);
- if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
+ final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE);
+ if (strategy == ValueFieldsStrategy.ALL) {
if (keyPrefix.length() > 0) {
throw new ValidationException(
String.format(
"A key prefix is not allowed when option '%s'
is set to '%s'. "
+ "Set it to '%s' instead to avoid
field overlaps.",
VALUE_FIELDS_INCLUDE.key(),
- KafkaConnectorOptions.ValueFieldsStrategy.ALL,
-
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY));
+ ValueFieldsStrategy.ALL,
+ ValueFieldsStrategy.EXCEPT_KEY));
}
return physicalFields.toArray();
- } else if (strategy ==
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
+ } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) {
final int[] keyProjection = createKeyFormatProjection(options,
physicalDataType);
return physicalFields
.filter(pos -> IntStream.of(keyProjection).noneMatch(k ->
k == pos))
@@ -497,82 +549,20 @@ class KafkaConnectorOptionsUtil {
}
}
- /**
- * The partitioner can be either "fixed", "round-robin" or a customized
partitioner full class
- * name.
- */
- public static Optional<FlinkKafkaPartitioner<RowData>>
getFlinkKafkaPartitioner(
- ReadableConfig tableOptions, ClassLoader classLoader) {
- return tableOptions
- .getOptional(SINK_PARTITIONER)
- .flatMap(
- (String partitioner) -> {
- switch (partitioner) {
- case SINK_PARTITIONER_VALUE_FIXED:
- return Optional.of(new
FlinkFixedPartitioner<>());
- case SINK_PARTITIONER_VALUE_DEFAULT:
- case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
- return Optional.empty();
- // Default fallback to full class name of the
partitioner.
- default:
- return Optional.of(
- initializePartitioner(partitioner,
classLoader));
- }
- });
- }
-
- /** Returns a class value with the given class name. */
- private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
- String name, ClassLoader classLoader) {
- try {
- Class<?> clazz = Class.forName(name, true, classLoader);
- if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
- throw new ValidationException(
- String.format(
- "Sink partitioner class '%s' should extend
from the required class %s",
- name, FlinkKafkaPartitioner.class.getName()));
- }
- @SuppressWarnings("unchecked")
- final FlinkKafkaPartitioner<T> kafkaPartitioner =
- InstantiationUtil.instantiate(name,
FlinkKafkaPartitioner.class, classLoader);
-
- return kafkaPartitioner;
- } catch (ClassNotFoundException | FlinkException e) {
+ static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+ if (tableOptions.get(DELIVERY_GUARANTEE) ==
DeliveryGuarantee.EXACTLY_ONCE
+ &&
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
throw new ValidationException(
- String.format("Could not find and instantiate partitioner
class '%s'", name),
- e);
- }
- }
-
- public static Properties getKafkaProperties(Map<String, String>
tableOptions) {
- final Properties kafkaProperties = new Properties();
-
- if (hasKafkaClientProperties(tableOptions)) {
- tableOptions.keySet().stream()
- .filter(key -> key.startsWith(PROPERTIES_PREFIX))
- .forEach(
- key -> {
- final String value = tableOptions.get(key);
- final String subKey =
key.substring((PROPERTIES_PREFIX).length());
- kafkaProperties.put(subKey, value);
- });
+ TRANSACTIONAL_ID_PREFIX.key()
+ + " must be specified when using
DeliveryGuarantee.EXACTLY_ONCE.");
}
- return kafkaProperties;
- }
-
- /**
- * Decides if the table options contains Kafka client properties that
start with prefix
- * 'properties'.
- */
- private static boolean hasKafkaClientProperties(Map<String, String>
tableOptions) {
- return tableOptions.keySet().stream().anyMatch(k ->
k.startsWith(PROPERTIES_PREFIX));
}
//
--------------------------------------------------------------------------------------------
// Inner classes
//
--------------------------------------------------------------------------------------------
- /** Kafka startup options. */
+ /** Kafka startup options. * */
public static class StartupOptions {
public StartupMode startupMode;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
new file mode 100644
index 0000000000..96892d747a
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A version-agnostic Kafka {@link DynamicTableSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+@Internal
+public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetadata {
+
+ private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Metadata that is appended at the end of a physical sink row. */
+ protected List<String> metadataKeys;
+
+ //
--------------------------------------------------------------------------------------------
+ // Format attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Data type of consumed data type. */
+ protected DataType consumedDataType;
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for encoding keys to Kafka. */
+ protected final @Nullable EncodingFormat<SerializationSchema<RowData>>
keyEncodingFormat;
+
+ /** Format for encoding values to Kafka. */
+ protected final EncodingFormat<SerializationSchema<RowData>>
valueEncodingFormat;
+
+ /** Indices that determine the key fields and the source position in the
consumed row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the source position in the
consumed row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the
physical data type. */
+ protected final @Nullable String keyPrefix;
+
+ //
--------------------------------------------------------------------------------------------
+ // Kafka-specific attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** The defined delivery guarantee. */
+ private final DeliveryGuarantee deliveryGuarantee;
+
+ /**
+ * If the {@link #deliveryGuarantee} is {@link
DeliveryGuarantee#EXACTLY_ONCE} the value is the
+ * prefix for all ids of opened Kafka transactions.
+ */
+ @Nullable
+ private final String transactionalIdPrefix;
+
+ /** The Kafka topic to write to. */
+ protected final String topic;
+
+ /** Properties for the Kafka producer. */
+ protected final Properties properties;
+
+ /** Partitioner to select Kafka partition for each item. */
+ protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+ /**
+ * Flag to determine sink mode. In upsert mode sink transforms the
delete/update-before message
+ * to tombstone message.
+ */
+ protected final boolean upsertMode;
+
+ /** Sink buffer flush config which only supported in upsert mode now. */
+ protected final SinkBufferFlushMode flushMode;
+
+ /** Parallelism of the physical Kafka producer. * */
+ protected final @Nullable Integer parallelism;
+
+ private final MetricOption metricOption;
+
+ public KafkaDynamicSink(
+ DataType consumedDataType,
+ DataType physicalDataType,
+ @Nullable EncodingFormat<SerializationSchema<RowData>>
keyEncodingFormat,
+ EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ String topic,
+ Properties properties,
+ @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+ DeliveryGuarantee deliveryGuarantee,
+ boolean upsertMode,
+ SinkBufferFlushMode flushMode,
+ @Nullable Integer parallelism,
+ @Nullable String transactionalIdPrefix,
+ MetricOption metricOption) {
+ // Format attributes
+ this.consumedDataType =
+ checkNotNull(consumedDataType, "Consumed data type must not be
null.");
+ this.physicalDataType =
+ checkNotNull(physicalDataType, "Physical data type must not be
null.");
+ this.keyEncodingFormat = keyEncodingFormat;
+ this.valueEncodingFormat =
+ checkNotNull(valueEncodingFormat, "Value encoding format must
not be null.");
+ this.keyProjection = checkNotNull(keyProjection, "Key projection must
not be null.");
+ this.valueProjection = checkNotNull(valueProjection, "Value projection
must not be null.");
+ this.keyPrefix = keyPrefix;
+ this.transactionalIdPrefix = transactionalIdPrefix;
+ // Mutable attributes
+ this.metadataKeys = Collections.emptyList();
+ // Kafka-specific attributes
+ this.topic = checkNotNull(topic, "Topic must not be null.");
+ this.properties = checkNotNull(properties, "Properties must not be
null.");
+ this.partitioner = partitioner;
+ this.deliveryGuarantee =
+ checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be
null.");
+ this.upsertMode = upsertMode;
+ this.flushMode = checkNotNull(flushMode);
+ if (flushMode.isEnabled() && !upsertMode) {
+ throw new IllegalArgumentException(
+ "Sink buffer flush is only supported in upsert-kafka.");
+ }
+ this.parallelism = parallelism;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return valueEncodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ final SerializationSchema<RowData> keySerialization =
+ createSerialization(context, keyEncodingFormat, keyProjection,
keyPrefix);
+
+ final SerializationSchema<RowData> valueSerialization =
+ createSerialization(context, valueEncodingFormat,
valueProjection, null);
+
+ final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
+ final List<LogicalType> physicalChildren =
physicalDataType.getLogicalType().getChildren();
+ if (transactionalIdPrefix != null) {
+ sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
+ }
+ final KafkaSink<RowData> kafkaSink =
+ sinkBuilder
+ .setDeliverGuarantee(deliveryGuarantee)
+ .setBootstrapServers(
+
properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+ .setKafkaProducerConfig(properties)
+ .setRecordSerializer(
+ new DynamicKafkaRecordSerializationSchema(
+ topic,
+ partitioner,
+ keySerialization,
+ valueSerialization,
+ getFieldGetters(physicalChildren,
keyProjection),
+ getFieldGetters(physicalChildren,
valueProjection),
+ hasMetadata(),
+ getMetadataPositions(physicalChildren),
+ upsertMode,
+ metricOption))
+ .build();
+ if (flushMode.isEnabled() && upsertMode) {
+ return new DataStreamSinkProvider() {
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(
+ ProviderContext providerContext, DataStream<RowData>
dataStream) {
+ final boolean objectReuse =
+
dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
+ final ReducingUpsertSink<?> sink =
+ new ReducingUpsertSink<>(
+ kafkaSink,
+ physicalDataType,
+ keyProjection,
+ flushMode,
+ objectReuse
+ ? rowData ->
createRowDataTypeSerializer(
+ context,
+
dataStream.getExecutionConfig()).copy(rowData)
+ : rowData -> rowData,
+ metricOption);
+ final DataStreamSink<RowData> end =
dataStream.sinkTo(sink);
+
providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
+ if (parallelism != null) {
+ end.setParallelism(parallelism);
+ }
+ return end;
+ }
+ };
+ }
+ return SinkV2Provider.of(kafkaSink, parallelism);
+ }
+
+ @Override
+ public Map<String, DataType> listWritableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+ Stream.of(WritableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ @Override
+ public void applyWritableMetadata(List<String> metadataKeys, DataType
consumedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.consumedDataType = consumedDataType;
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ final KafkaDynamicSink copy =
+ new KafkaDynamicSink(
+ consumedDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ deliveryGuarantee,
+ upsertMode,
+ flushMode,
+ parallelism,
+ transactionalIdPrefix,
+ metricOption);
+ copy.metadataKeys = metadataKeys;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kafka table sink";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final KafkaDynamicSink that = (KafkaDynamicSink) o;
+ return Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(consumedDataType, that.consumedDataType)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+ && Objects.equals(valueEncodingFormat,
that.valueEncodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(topic, that.topic)
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(partitioner, that.partitioner)
+ && Objects.equals(deliveryGuarantee, that.deliveryGuarantee)
+ && Objects.equals(upsertMode, that.upsertMode)
+ && Objects.equals(flushMode, that.flushMode)
+ && Objects.equals(transactionalIdPrefix,
that.transactionalIdPrefix)
+ && Objects.equals(parallelism, that.parallelism)
+ && Objects.equals(metricOption, that.metricOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ metadataKeys,
+ consumedDataType,
+ physicalDataType,
+ keyEncodingFormat,
+ valueEncodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topic,
+ properties,
+ partitioner,
+ deliveryGuarantee,
+ upsertMode,
+ flushMode,
+ transactionalIdPrefix,
+ parallelism,
+ metricOption);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private TypeSerializer<RowData> createRowDataTypeSerializer(
+ Context context, ExecutionConfig executionConfig) {
+ final TypeInformation<RowData> typeInformation =
+ context.createTypeInformation(consumedDataType);
+ return typeInformation.createSerializer(executionConfig);
+ }
+
+ private int[] getMetadataPositions(List<LogicalType> physicalChildren) {
+ return Stream.of(WritableMetadata.values())
+ .mapToInt(
+ m -> {
+ final int pos = metadataKeys.indexOf(m.key);
+ if (pos < 0) {
+ return -1;
+ }
+ return physicalChildren.size() + pos;
+ })
+ .toArray();
+ }
+
+ private boolean hasMetadata() {
+ return metadataKeys.size() > 0;
+ }
+
+ private RowData.FieldGetter[] getFieldGetters(
+ List<LogicalType> physicalChildren, int[] keyProjection) {
+ return Arrays.stream(keyProjection)
+ .mapToObj(
+ targetField -> RowData.createFieldGetter(
+ physicalChildren.get(targetField),
targetField))
+ .toArray(RowData.FieldGetter[]::new);
+ }
+
+ private @Nullable SerializationSchema<RowData> createSerialization(
+ Context context,
+ @Nullable EncodingFormat<SerializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
Projection.of(projection).project(this.physicalDataType);
+ if (prefix != null) {
+ physicalFormatDataType =
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeEncoder(context, physicalFormatDataType);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ enum WritableMetadata {
+
+ HEADERS(
+ "headers",
+ // key and value of the map are nullable to make handling
easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(),
DataTypes.BYTES().nullable())
+ .nullable(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ final MapData map = row.getMap(pos);
+ final ArrayData keyArray = map.keyArray();
+ final ArrayData valueArray = map.valueArray();
+ final List<Header> headers = new ArrayList<>();
+ for (int i = 0; i < keyArray.size(); i++) {
+ if (!keyArray.isNullAt(i) &&
!valueArray.isNullAt(i)) {
+ final String key =
keyArray.getString(i).toString();
+ final byte[] value = valueArray.getBinary(i);
+ headers.add(new KafkaHeader(key, value));
+ }
+ }
+ return headers;
+ }
+ }),
+
+ TIMESTAMP(
+ "timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData row, int pos) {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return row.getTimestamp(pos, 3).getMillisecond();
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ WritableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+
+ interface MetadataConverter extends Serializable {
+
+ Object read(RowData consumedRow, int pos);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private static class KafkaHeader implements Header {
+
+ private final String key;
+
+ private final byte[] value;
+
+ KafkaHeader(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public byte[] value() {
+ return value;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
new file mode 100644
index 0000000000..652f971785
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** A version-agnostic Kafka {@link ScanTableSource}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+@Internal
+public class KafkaDynamicSource
+ implements
+ ScanTableSource,
+ SupportsReadingMetadata,
+ SupportsWatermarkPushDown {
+
+ private static final String KAFKA_TRANSFORMATION = "kafka";
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+ //
--------------------------------------------------------------------------------------------
+ // Format attributes
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for decoding keys from Kafka. */
+ protected final @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat;
+
+ /** Format for decoding values from Kafka. */
+ protected final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat;
+
+ /** Indices that determine the key fields and the target position in the
produced row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the target position in the
produced row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the
physical data type. */
+ protected final @Nullable String keyPrefix;
+
+ //
--------------------------------------------------------------------------------------------
+ // Kafka-specific attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** The Kafka topics to consume. */
+ protected final List<String> topics;
+
+ /** The Kafka topic pattern to consume. */
+ protected final Pattern topicPattern;
+
+ /** Properties for the Kafka consumer. */
+ protected final Properties properties;
+
+ /**
+ * The startup mode for the contained consumer (default is
{StartupMode#GROUP_OFFSETS}).
+ */
+ protected final StartupMode startupMode;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+ /**
+ * The start timestamp to locate partition offsets; only relevant when
startup mode is {@link
+ * StartupMode#TIMESTAMP}.
+ */
+ protected final long startupTimestampMillis;
+
+ /** Flag to determine source mode. In upsert mode, it will keep the
tombstone message. * */
+ protected final boolean upsertMode;
+
+ protected final String tableIdentifier;
+
+ private final MetricOption metricOption;
+
+ public KafkaDynamicSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ @Nullable List<String> topics,
+ @Nullable Pattern topicPattern,
+ Properties properties,
+ StartupMode startupMode,
+ Map<KafkaTopicPartition, Long> specificStartupOffsets,
+ long startupTimestampMillis,
+ boolean upsertMode,
+ String tableIdentifier,
+ MetricOption metricOption) {
+ // Format attributes
+ this.physicalDataType =
+ Preconditions.checkNotNull(
+ physicalDataType, "Physical data type must not be
null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.valueDecodingFormat =
+ Preconditions.checkNotNull(
+ valueDecodingFormat, "Value decoding format must not
be null.");
+ this.keyProjection =
+ Preconditions.checkNotNull(keyProjection, "Key projection must
not be null.");
+ this.valueProjection =
+ Preconditions.checkNotNull(valueProjection, "Value projection
must not be null.");
+ this.keyPrefix = keyPrefix;
+ // Mutable attributes
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = Collections.emptyList();
+ this.watermarkStrategy = null;
+ // Kafka-specific attributes
+ Preconditions.checkArgument(
+ (topics != null && topicPattern == null)
+ || (topics == null && topicPattern != null),
+ "Either Topic or Topic Pattern must be set for source.");
+ this.topics = topics;
+ this.topicPattern = topicPattern;
+ this.properties = Preconditions.checkNotNull(properties, "Properties
must not be null.");
+ this.startupMode =
+ Preconditions.checkNotNull(startupMode, "Startup mode must not
be null.");
+ this.specificStartupOffsets =
+ Preconditions.checkNotNull(
+ specificStartupOffsets, "Specific offsets must not be
null.");
+ this.startupTimestampMillis = startupTimestampMillis;
+ this.upsertMode = upsertMode;
+ this.tableIdentifier = tableIdentifier;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final DeserializationSchema<RowData> keyDeserialization =
+ createDeserialization(context, keyDecodingFormat,
keyProjection, keyPrefix);
+
+ final DeserializationSchema<RowData> valueDeserialization =
+ createDeserialization(context, valueDecodingFormat,
valueProjection, null);
+
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ final KafkaSource<RowData> kafkaSource =
+ createKafkaSource(keyDeserialization, valueDeserialization,
producedTypeInfo);
+
+ return new DataStreamScanProvider() {
+
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
+ if (watermarkStrategy == null) {
+ watermarkStrategy = WatermarkStrategy.noWatermarks();
+ }
+ DataStreamSource<RowData> sourceStream =
+ execEnv.fromSource(
+ kafkaSource, watermarkStrategy, "KafkaSource-"
+ tableIdentifier);
+
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
+ return sourceStream;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
+ }
+ };
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+
+ // add value format metadata with prefix
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX
+ key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public boolean supportsMetadataProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final KafkaDynamicSource copy =
+ new KafkaDynamicSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ upsertMode,
+ tableIdentifier,
+ metricOption);
+ copy.producedDataType = producedDataType;
+ copy.metadataKeys = metadataKeys;
+ copy.watermarkStrategy = watermarkStrategy;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Kafka table source";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final KafkaDynamicSource that = (KafkaDynamicSource) o;
+ return Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(topics, that.topics)
+ && Objects.equals(String.valueOf(topicPattern),
String.valueOf(that.topicPattern))
+ && Objects.equals(properties, that.properties)
+ && startupMode == that.startupMode
+ && Objects.equals(specificStartupOffsets,
that.specificStartupOffsets)
+ && startupTimestampMillis == that.startupTimestampMillis
+ && Objects.equals(upsertMode, that.upsertMode)
+ && Objects.equals(tableIdentifier, that.tableIdentifier)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+ && Objects.equals(metricOption, that.metricOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ producedDataType,
+ metadataKeys,
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ topics,
+ topicPattern,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ upsertMode,
+ tableIdentifier,
+ watermarkStrategy,
+ metricOption);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ protected KafkaSource<RowData> createKafkaSource(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+
+ final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+ createKafkaDeserializationSchema(
+ keyDeserialization, valueDeserialization,
producedTypeInfo);
+
+ final KafkaSourceBuilder<RowData> kafkaSourceBuilder =
KafkaSource.builder();
+
+ if (topics != null) {
+ kafkaSourceBuilder.setTopics(topics);
+ } else {
+ kafkaSourceBuilder.setTopicPattern(topicPattern);
+ }
+
+ switch (startupMode) {
+ case EARLIEST:
+
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+ break;
+ case LATEST:
+
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
+ break;
+ case GROUP_OFFSETS:
+ String offsetResetConfig =
+ properties.getProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.NONE.name());
+ OffsetResetStrategy offsetResetStrategy =
getResetStrategy(offsetResetConfig);
+ kafkaSourceBuilder.setStartingOffsets(
+
OffsetsInitializer.committedOffsets(offsetResetStrategy));
+ break;
+ case SPECIFIC_OFFSETS:
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ specificStartupOffsets.forEach(
+ (tp, offset) -> offsets.put(
+ new TopicPartition(tp.getTopic(),
tp.getPartition()),
+ offset));
+
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
+ break;
+ case TIMESTAMP:
+ kafkaSourceBuilder.setStartingOffsets(
+ OffsetsInitializer.timestamp(startupTimestampMillis));
+ break;
+ }
+ kafkaSourceBuilder
+ .setProperties(properties)
+
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
+
+ return kafkaSourceBuilder.build();
+ }
+
+ private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+ return Arrays.stream(OffsetResetStrategy.values())
+ .filter(ors ->
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+ .findAny()
+ .orElseThrow(
+ () -> new IllegalArgumentException(
+ String.format(
+ "%s can not be set to %s. Valid
values: [%s]",
+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ offsetResetConfig,
+
Arrays.stream(OffsetResetStrategy.values())
+ .map(Enum::name)
+ .map(String::toLowerCase)
+
.collect(Collectors.joining(",")))));
+ }
+
+ private KafkaDeserializationSchema<RowData>
createKafkaDeserializationSchema(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k -> Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+
.orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity =
+ DataType.getFieldDataTypes(producedDataType).size() -
metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata
columns at the end
+ final int[] adjustedValueProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length + valueProjection.length,
+ adjustedPhysicalArity))
+ .toArray();
+ return new DynamicKafkaDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode,
+ metricOption);
+ }
+
+ private @Nullable DeserializationSchema<RowData> createDeserialization(
+ Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
Projection.of(projection).project(this.physicalDataType);
+ if (prefix != null) {
+ physicalFormatDataType =
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return StringData.fromString(record.topic());
+ }
+ }),
+
+ PARTITION(
+ "partition",
+ DataTypes.INT().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.partition();
+ }
+ }),
+
+ HEADERS(
+ "headers",
+ // key and value of the map are nullable to make handling
easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(),
DataTypes.BYTES().nullable())
+ .notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ final Map<StringData, byte[]> map = new HashMap<>();
+ for (Header header : record.headers()) {
+ map.put(StringData.fromString(header.key()),
header.value());
+ }
+ return new GenericMapData(map);
+ }
+ }),
+
+ LEADER_EPOCH(
+ "leader-epoch",
+ DataTypes.INT().nullable(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.leaderEpoch().orElse(null);
+ }
+ }),
+
+ OFFSET(
+ "offset",
+ DataTypes.BIGINT().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.offset();
+ }
+ }),
+
+ TIMESTAMP(
+ "timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return
TimestampData.fromEpochMillis(record.timestamp());
+ }
+ }),
+
+ TIMESTAMP_TYPE(
+ "timestamp-type",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return
StringData.fromString(record.timestampType().toString());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index c87735ffc8..3a320b7f88 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.kafka.table;
import org.apache.inlong.sort.base.Constants;
-import org.apache.inlong.sort.kafka.KafkaOptions;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -32,8 +32,6 @@ import
org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -67,40 +65,16 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.StartupOptions;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
-import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.*;
/**
- * Factory for creating configured instances of {@link KafkaDynamicSource} and
{@link
+ * Factory for creating configured instances of KafkaDynamicSource and
* KafkaDynamicSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
*/
@Internal
public class KafkaDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
@@ -181,6 +155,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
+
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
@@ -222,6 +197,15 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -235,7 +219,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
- context.getObjectIdentifier().asSummaryString());
+ context.getObjectIdentifier().asSummaryString(),
+ metricOption);
}
@Override
@@ -250,7 +235,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat
=
getValueEncodingFormat(helper);
- helper.validateExcept(KafkaConnectorOptionsUtil.PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX);
final ReadableConfig tableOptions = helper.getOptions();
@@ -275,6 +260,16 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final Integer parallelism =
tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
return createKafkaTableSink(
physicalDataType,
keyEncodingFormat.orElse(null),
@@ -287,7 +282,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
getFlinkKafkaPartitioner(tableOptions,
context.getClassLoader()).orElse(null),
deliveryGuarantee,
parallelism,
- tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+ tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+ metricOption);
}
private static Optional<DecodingFormat<DeserializationSchema<RowData>>>
getKeyDecodingFormat(
@@ -390,7 +386,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
- String tableIdentifier) {
+ String tableIdentifier,
+ MetricOption metricOption) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
@@ -405,7 +402,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
specificStartupOffsets,
startupTimestampMillis,
false,
- tableIdentifier);
+ tableIdentifier,
+ metricOption);
}
protected KafkaDynamicSink createKafkaTableSink(
@@ -420,7 +418,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
FlinkKafkaPartitioner<RowData> partitioner,
DeliveryGuarantee deliveryGuarantee,
Integer parallelism,
- @Nullable String transactionalIdPrefix) {
+ @Nullable String transactionalIdPrefix,
+ MetricOption metricOption) {
return new KafkaDynamicSink(
physicalDataType,
physicalDataType,
@@ -436,7 +435,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
false,
SinkBufferFlushMode.DISABLED,
parallelism,
- transactionalIdPrefix);
+ transactionalIdPrefix,
+ metricOption);
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
similarity index 92%
rename from
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
rename to
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
index 6962eb6948..2fe0b41baa 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
@@ -15,12 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.kafka;
+package org.apache.inlong.sort.kafka.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
-/** Option utils for Kafka table source sink. */
+/** Option utils for Kafka table source sink.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
public class KafkaOptions {
private KafkaOptions() {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
new file mode 100644
index 0000000000..e00e219fe8
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper
{@link SinkWriter}
+ * and only emit it when the buffer is full or a timer is triggered or a
checkpoint happens.
+ *
+ * <p>The sink provides eventual consistency guarantees without the need of a
two-phase protocol
+ * because the updates are idempotent therefore duplicates have no effect.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ */
+class ReducingUpsertSink<WriterState> implements StatefulSink<RowData,
WriterState> {
+
+ private final StatefulSink<RowData, WriterState> wrappedSink;
+ private final DataType physicalDataType;
+ private final int[] keyProjection;
+ private final SinkBufferFlushMode bufferFlushMode;
+ private final SerializableFunction<RowData, RowData> valueCopyFunction;
+ private final MetricOption metricOption;
+
+ ReducingUpsertSink(
+ StatefulSink<RowData, WriterState> wrappedSink,
+ DataType physicalDataType,
+ int[] keyProjection,
+ SinkBufferFlushMode bufferFlushMode,
+ SerializableFunction<RowData, RowData> valueCopyFunction,
+ MetricOption metricOption) {
+ this.wrappedSink = wrappedSink;
+ this.physicalDataType = physicalDataType;
+ this.keyProjection = keyProjection;
+ this.bufferFlushMode = bufferFlushMode;
+ this.valueCopyFunction = valueCopyFunction;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public StatefulSinkWriter<RowData, WriterState> createWriter(InitContext
context)
+ throws IOException {
+ final StatefulSinkWriter<RowData, WriterState> wrapperWriter =
+ wrappedSink.createWriter(context);
+ return new ReducingUpsertWriter<>(
+ wrapperWriter,
+ physicalDataType,
+ keyProjection,
+ bufferFlushMode,
+ context.getProcessingTimeService(),
+ valueCopyFunction,
+ metricOption);
+ }
+
+ @Override
+ public StatefulSinkWriter<RowData, WriterState> restoreWriter(
+ InitContext context, Collection<WriterState> recoveredState)
throws IOException {
+ final StatefulSinkWriter<RowData, WriterState> wrapperWriter =
+ wrappedSink.restoreWriter(context, recoveredState);
+ return new ReducingUpsertWriter<>(
+ wrapperWriter,
+ physicalDataType,
+ keyProjection,
+ bufferFlushMode,
+ context.getProcessingTimeService(),
+ valueCopyFunction,
+ metricOption);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {
+ return wrappedSink.getWriterStateSerializer();
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
new file mode 100644
index 0000000000..b118e3cde5
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static
org.apache.inlong.sort.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * @param <WriterState>
+ */
+class ReducingUpsertWriter<WriterState>
+ implements
+ StatefulSink.StatefulSinkWriter<RowData, WriterState> {
+
+ private final StatefulSink.StatefulSinkWriter<RowData, WriterState>
wrappedWriter;
+ private final WrappedContext wrappedContext = new WrappedContext();
+ private final int batchMaxRowNums;
+ private final Function<RowData, RowData> valueCopyFunction;
+ private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new
HashMap<>();
+ private final Function<RowData, RowData> keyExtractor;
+ private final ProcessingTimeService timeService;
+ private final long batchIntervalMs;
+
+ private boolean closed = false;
+ private long lastFlush = System.currentTimeMillis();
+
+ private SourceMetricData sourceMetricData;
+
+ ReducingUpsertWriter(
+ StatefulSink.StatefulSinkWriter<RowData, WriterState>
wrappedWriter,
+ DataType physicalDataType,
+ int[] keyProjection,
+ SinkBufferFlushMode bufferFlushMode,
+ ProcessingTimeService timeService,
+ Function<RowData, RowData> valueCopyFunction,
+ MetricOption metricOption) {
+ checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+ this.wrappedWriter = checkNotNull(wrappedWriter);
+ this.timeService = checkNotNull(timeService);
+ this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+ this.batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+ registerFlush();
+ List<LogicalType> fields =
physicalDataType.getLogicalType().getChildren();
+ final RowData.FieldGetter[] keyFieldGetters =
+ Arrays.stream(keyProjection)
+ .mapToObj(
+ targetField -> RowData.createFieldGetter(
+ fields.get(targetField), targetField))
+ .toArray(RowData.FieldGetter[]::new);
+ this.keyExtractor = rowData -> createProjectedRow(rowData,
RowKind.INSERT, keyFieldGetters);
+ this.valueCopyFunction = valueCopyFunction;
+ if (metricOption != null) {
+ this.sourceMetricData = new SourceMetricData(metricOption);
+ }
+ }
+
+ @Override
+ public void write(RowData element, Context context) throws IOException,
InterruptedException {
+ wrappedContext.setContext(context);
+ addToBuffer(element, context.timestamp());
+ }
+
+ @Override
+ public void flush(boolean endOfInput) throws IOException,
InterruptedException {
+ flush();
+ }
+
+ @Override
+ public List<WriterState> snapshotState(long checkpointId) throws
IOException {
+ return wrappedWriter.snapshotState(checkpointId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closed) {
+ closed = true;
+ wrappedWriter.close();
+ }
+ }
+
+ private void addToBuffer(RowData row, Long timestamp) throws IOException,
InterruptedException {
+ RowData key = keyExtractor.apply(row);
+ RowData value = valueCopyFunction.apply(row);
+ reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+
+ if (reduceBuffer.size() >= batchMaxRowNums) {
+ flush();
+ }
+ }
+
+ private void registerFlush() {
+ if (closed) {
+ return;
+ }
+ timeService.registerTimer(
+ lastFlush + batchIntervalMs,
+ (t) -> {
+ if (t >= lastFlush + batchIntervalMs) {
+ flush();
+ }
+ registerFlush();
+ });
+ }
+
+ private RowData changeFlag(RowData value) {
+ switch (value.getRowKind()) {
+ case INSERT:
+ case UPDATE_AFTER:
+ value.setRowKind(UPDATE_AFTER);
+ break;
+ case UPDATE_BEFORE:
+ case DELETE:
+ value.setRowKind(DELETE);
+ }
+ return value;
+ }
+
+ private void flush() throws IOException, InterruptedException {
+ for (Tuple2<RowData, Long> value : reduceBuffer.values()) {
+ wrappedContext.setTimestamp(value.f1);
+ wrappedWriter.write(value.f0, wrappedContext);
+ if (sourceMetricData != null) {
+ sourceMetricData.outputMetricsWithEstimate(value.f0);
+ }
+ }
+ lastFlush = System.currentTimeMillis();
+ reduceBuffer.clear();
+ }
+
+ /**
+ * Wrapper of {@link Context}.
+ *
+ * <p>When records arrives, the {@link ReducingUpsertWriter} updates the
current {@link
+ * Context} and memorize the timestamp with the records. When flushing,
the {@link
+ * ReducingUpsertWriter} will emit the records in the buffer with
memorized timestamp.
+ */
+ private static class WrappedContext implements Context {
+
+ private long timestamp;
+ private Context context;
+
+ @Override
+ public long currentWatermark() {
+ checkNotNull(context, "context must be set before retrieving it.");
+ return context.currentWatermark();
+ }
+
+ @Override
+ public Long timestamp() {
+ checkNotNull(timestamp, "timestamp must to be set before
retrieving it.");
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public void setContext(Context context) {
+ this.context = context;
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 2f61265c32..6a8f5e20c9 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.kafka.table;
import org.apache.inlong.sort.base.Constants;
-import org.apache.inlong.sort.kafka.KafkaOptions;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -29,8 +29,6 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -58,17 +56,8 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
+import static org.apache.inlong.sort.base.Constants.*;
import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
@@ -77,7 +66,10 @@ import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKa
import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
import static
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
-/** Upsert-Kafka factory. */
+/** Upsert-Kafka factory.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "upsert-kafka-inlong";
@@ -138,6 +130,16 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST;
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
return new KafkaDynamicSource(
context.getPhysicalRowDataType(),
keyDecodingFormat,
@@ -152,7 +154,8 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
Collections.emptyMap(),
0,
true,
- context.getObjectIdentifier().asSummaryString());
+ context.getObjectIdentifier().asSummaryString(),
+ metricOption);
}
@Override
@@ -188,6 +191,15 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
SinkBufferFlushMode flushMode =
new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
// use {@link
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
// it will use hash partition if key is set else in round-robin
behaviour.
return new KafkaDynamicSink(
@@ -205,7 +217,8 @@ public class UpsertKafkaDynamicTableFactory implements
DynamicTableSourceFactory
true,
flushMode,
parallelism,
- tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+ tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+ metricOption);
}
private Tuple2<int[], int[]>
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index c980b63fbd..3546604c06 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -815,9 +815,16 @@
Source : flink-connector-hbase-2.2 1.15.4 (Please note that the software
have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE
-1.3.20
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
-
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+1.3.20
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
Source : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that
the software have been modified.)
License : https://github.com/apache/flink/blob/master/LICENSE