This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dd43462bd [INLONG-10381][Sort] Kafka source connector report audit 
attach input time (#10393)
0dd43462bd is described below

commit 0dd43462bd17f8ba518a9aa15708decba8879059
Author: XiaoYou201 <[email protected]>
AuthorDate: Thu Jun 13 16:25:10 2024 +0800

    [INLONG-10381][Sort] Kafka source connector report audit attach input time 
(#10393)
---
 .../protocol/node/extract/KafkaExtractNode.java    | 11 ++++++++-
 .../node/extract/KafkaExtractNodeTest.java         |  1 +
 .../sort-flink-v1.15/sort-connectors/kafka/pom.xml |  6 ++++-
 .../table/DynamicKafkaDeserializationSchema.java   | 27 +++++++++++++++++++---
 .../sort/kafka/table/KafkaDynamicSource.java       | 13 +++++++++++
 5 files changed, 53 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index f595f0f85d..f8b4a9757a 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -27,6 +27,7 @@ import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
 
@@ -262,6 +263,13 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
             case TIMESTAMP:
                 metadataKey = "timestamp";
                 break;
+            case AUDIT_DATA_TIME:
+                if (format instanceof InLongMsgFormat) {
+                    metadataKey = INLONG_MSG_AUDIT_TIME;
+                } else {
+                    metadataKey = CONSUME_AUDIT_TIME;
+                }
+                break;
             default:
                 throw new 
UnsupportedOperationException(String.format("Unsupport meta field for %s: %s",
                         this.getClass().getSimpleName(), metaField));
@@ -279,6 +287,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
             case PARTITION:
             case OFFSET:
             case TIMESTAMP:
+            case AUDIT_DATA_TIME:
                 return true;
             default:
                 return false;
@@ -291,6 +300,6 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
                 MetaField.SQL_TYPE, MetaField.PK_NAMES, MetaField.TS, 
MetaField.OP_TS, MetaField.IS_DDL,
                 MetaField.MYSQL_TYPE, MetaField.BATCH_ID, 
MetaField.UPDATE_BEFORE,
                 MetaField.KEY, MetaField.VALUE, MetaField.PARTITION, 
MetaField.HEADERS,
-                MetaField.HEADERS_TO_JSON_STR, MetaField.OFFSET, 
MetaField.TIMESTAMP);
+                MetaField.HEADERS_TO_JSON_STR, MetaField.OFFSET, 
MetaField.TIMESTAMP, MetaField.AUDIT_DATA_TIME);
     }
 }
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index b7b8e327aa..69333effb1 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -112,6 +112,7 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest<KafkaExtractNode> {
         formatMap.put(MetaField.OFFSET, "BIGINT METADATA FROM 'offset' 
VIRTUAL");
         formatMap.put(MetaField.PARTITION, "BIGINT METADATA FROM 'partition' 
VIRTUAL");
         formatMap.put(MetaField.TIMESTAMP, "TIMESTAMP_LTZ(3) METADATA FROM 
'timestamp' VIRTUAL");
+        formatMap.put(MetaField.AUDIT_DATA_TIME, "BIGINT METADATA FROM 
'consume_time' VIRTUAL");
 
         KafkaExtractNode node = getTestObject();
         boolean formatEquals = true;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
index ad7d93946a..019b0f456c 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/pom.xml
@@ -54,8 +54,12 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
-
     <build>
         <plugins>
             <plugin>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 7863321c9e..4406189081 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -63,6 +63,9 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
     private final MetricOption metricOption;
 
     private SourceMetricData sourceMetricData;
+
+    private int consumeTimeIndex = -1;
+
     DynamicKafkaDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -104,6 +107,12 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         if (metricOption != null) {
             sourceMetricData = new SourceMetricData(metricOption);
         }
+        for (int i = 0; i < outputCollector.metadataConverters.length; i++) {
+            if (outputCollector.metadataConverters[i]
+                    
.equals(KafkaDynamicSource.ReadableMetadata.CONSUME_TIME.converter)) {
+                consumeTimeIndex = i;
+            }
+        }
     }
 
     @Override
@@ -131,12 +140,16 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         if (keyDeserialization != null) {
             keyDeserialization.deserialize(record.key(), keyCollector);
         }
-
         // project output while emitting values
         outputCollector.inputRecord = record;
         outputCollector.physicalKeyRows = keyCollector.buffer;
-        outputCollector.outputCollector =
-                sourceMetricData == null ? collector : new 
MetricsCollector<>(collector, sourceMetricData);
+        if (sourceMetricData != null) {
+            MetricsCollector<RowData> metricsCollector = new 
MetricsCollector<>(collector, sourceMetricData);
+            
metricsCollector.resetTimestamp(getRecordTime(outputCollector.metadataConverters,
 record));
+            outputCollector.outputCollector = metricsCollector;
+        } else {
+            outputCollector.outputCollector = collector;
+        }
         if (record.value() == null && upsertMode) {
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
@@ -146,6 +159,14 @@ class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<Ro
         keyCollector.buffer.clear();
     }
 
+    private Long getRecordTime(MetadataConverter[] metadataConverters,
+            ConsumerRecord<byte[], byte[]> record) {
+        if (consumeTimeIndex == -1) {
+            return System.currentTimeMillis();
+        }
+        return (Long) metadataConverters[consumeTimeIndex].read(record);
+    }
+
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 652f971785..322e6fc758 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka.table;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import 
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.inlong.sort.protocol.node.ExtractNode;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -620,6 +621,18 @@ public class KafkaDynamicSource
                     public Object read(ConsumerRecord<?, ?> record) {
                         return 
StringData.fromString(record.timestampType().toString());
                     }
+                }),
+        CONSUME_TIME(
+                ExtractNode.CONSUME_AUDIT_TIME,
+                DataTypes.BIGINT().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return System.currentTimeMillis();
+                    }
                 });
 
         final String key;

Reply via email to