This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0dd43462bd [INLONG-10381][Sort] Kafka source connector report audit
attach input time (#10393)
0dd43462bd is described below
commit 0dd43462bd17f8ba518a9aa15708decba8879059
Author: XiaoYou201 <[email protected]>
AuthorDate: Thu Jun 13 16:25:10 2024 +0800
[INLONG-10381][Sort] Kafka source connector report audit attach input time
(#10393)
---
.../protocol/node/extract/KafkaExtractNode.java | 11 ++++++++-
.../node/extract/KafkaExtractNodeTest.java | 1 +
.../sort-flink-v1.15/sort-connectors/kafka/pom.xml | 6 ++++-
.../table/DynamicKafkaDeserializationSchema.java | 27 +++++++++++++++++++---
.../sort/kafka/table/KafkaDynamicSource.java | 13 +++++++++++
5 files changed, 53 insertions(+), 5 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index f595f0f85d..f8b4a9757a 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -27,6 +27,7 @@ import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -262,6 +263,13 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
case TIMESTAMP:
metadataKey = "timestamp";
break;
+ case AUDIT_DATA_TIME:
+ if (format instanceof InLongMsgFormat) {
+ metadataKey = INLONG_MSG_AUDIT_TIME;
+ } else {
+ metadataKey = CONSUME_AUDIT_TIME;
+ }
+ break;
default:
throw new
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
@@ -279,6 +287,7 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
case PARTITION:
case OFFSET:
case TIMESTAMP:
+ case AUDIT_DATA_TIME:
return true;
default:
return false;
@@ -291,6 +300,6 @@ public class KafkaExtractNode extends ExtractNode
implements InlongMetric, Metad
MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS,
MetaField.OP_TS, MetaField.IS_DDL,
MetaField.MYSQL_TYPE, MetaField.BATCH_ID,
MetaField.UPDATE_BEFORE,
MetaField.KEY, MetaField.VALUE, MetaField.PARTITION,
MetaField.HEADERS,
- MetaField.HEADERS_TO_JSON_STR, MetaField.OFFSET,
MetaField.TIMESTAMP);
+ MetaField.HEADERS_TO_JSON_STR, MetaField.OFFSET,
MetaField.TIMESTAMP, MetaField.AUDIT_DATA_TIME);
}
}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index b7b8e327aa..69333effb1 100644
---
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -112,6 +112,7 @@ public class KafkaExtractNodeTest extends
SerializeBaseTest<KafkaExtractNode> {
formatMap.put(MetaField.OFFSET, "BIGINT METADATA FROM 'offset'
VIRTUAL");
formatMap.put(MetaField.PARTITION, "BIGINT METADATA FROM 'partition'
VIRTUAL");
formatMap.put(MetaField.TIMESTAMP, "TIMESTAMP_LTZ(3) METADATA FROM
'timestamp' VIRTUAL");
+ formatMap.put(MetaField.AUDIT_DATA_TIME, "BIGINT METADATA FROM
'consume_time' VIRTUAL");
KafkaExtractNode node = getTestObject();
boolean formatEquals = true;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index ad7d93946a..019b0f456c 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -54,8 +54,12 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
-
<build>
<plugins>
<plugin>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 7863321c9e..4406189081 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -63,6 +63,9 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
private final MetricOption metricOption;
private SourceMetricData sourceMetricData;
+
+ private int consumeTimeIndex = -1;
+
DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -104,6 +107,12 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption);
}
+ for (int i = 0; i < outputCollector.metadataConverters.length; i++) {
+ if (outputCollector.metadataConverters[i]
+
.equals(KafkaDynamicSource.ReadableMetadata.CONSUME_TIME.converter)) {
+ consumeTimeIndex = i;
+ }
+ }
}
@Override
@@ -131,12 +140,16 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
}
-
// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
- outputCollector.outputCollector =
- sourceMetricData == null ? collector : new
MetricsCollector<>(collector, sourceMetricData);
+ if (sourceMetricData != null) {
+ MetricsCollector<RowData> metricsCollector = new
MetricsCollector<>(collector, sourceMetricData);
+
metricsCollector.resetTimestamp(getRecordTime(outputCollector.metadataConverters,
record));
+ outputCollector.outputCollector = metricsCollector;
+ } else {
+ outputCollector.outputCollector = collector;
+ }
if (record.value() == null && upsertMode) {
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
@@ -146,6 +159,14 @@ class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSchema<Ro
keyCollector.buffer.clear();
}
+ private Long getRecordTime(MetadataConverter[] metadataConverters,
+ ConsumerRecord<byte[], byte[]> record) {
+ if (consumeTimeIndex == -1) {
+ return System.currentTimeMillis();
+ }
+ return (Long) metadataConverters[consumeTimeIndex].read(record);
+ }
+
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 652f971785..322e6fc758 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka.table;
import org.apache.inlong.sort.base.metric.MetricOption;
import
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -620,6 +621,18 @@ public class KafkaDynamicSource
public Object read(ConsumerRecord<?, ?> record) {
return
StringData.fromString(record.timestampType().toString());
}
+ }),
+ CONSUME_TIME(
+ ExtractNode.CONSUME_AUDIT_TIME,
+ DataTypes.BIGINT().notNull(),
+ new MetadataConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return System.currentTimeMillis();
+ }
});
final String key;