This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e2897c983 [INLONG-10573][Sort] Make pulsar source connector report 
audit attach input time (#10574)
7e2897c983 is described below

commit 7e2897c983ad61d656e6ab83b2844add820c1b5f
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 9 17:00:42 2024 +0800

    [INLONG-10573][Sort] Make pulsar source connector report audit attach input 
time (#10574)
---
 .../org/apache/inlong/sort/pulsar/PulsarTableFactory.java |  6 ------
 .../inlong/sort/pulsar/table/PulsarReadableMetadata.java  | 15 +++++++++++++--
 .../inlong/sort/pulsar/table/PulsarRowDataConverter.java  |  2 +-
 .../pulsar/table/PulsarTableDeserializationSchema.java    | 15 +++------------
 .../table/PulsarTableDeserializationSchemaFactory.java    |  6 ------
 5 files changed, 17 insertions(+), 27 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 80ef5b6363..780dfde8b5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -47,7 +47,6 @@ import java.util.stream.Stream;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
 import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
@@ -95,8 +94,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
 
     public static final boolean UPSERT_DISABLED = false;
 
-    public static boolean innerFormat = false;
-
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
@@ -107,8 +104,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                 getValueDecodingFormat(helper);
         ReadableConfig tableOptions = helper.getOptions();
 
-        innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
-
         // Validate configs are not conflict; each options is consumed; no 
unwanted configs
         // PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not 
part of the validation.
         helper.validateExcept(
@@ -160,7 +155,6 @@ public class PulsarTableFactory implements 
DynamicTableSourceFactory {
                         valueDecodingFormat,
                         valueProjection,
                         UPSERT_DISABLED,
-                        innerFormat,
                         inlongMetric,
                         auditHostAndPorts,
                         auditKeys);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
index 6740e6111f..5a40bb1896 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
@@ -17,14 +17,17 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
+import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
 import org.apache.pulsar.client.api.Message;
 
 import java.io.Serializable;
@@ -34,6 +37,8 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.inlong.sort.pulsar.table.PulsarReadableMetadata.ReadableMetadata.CONSUME_TIME;
+
 /**
  * Class for reading metadata fields from a Pulsar message and put in 
corresponding Flink row
  * fields.
@@ -66,10 +71,16 @@ public class PulsarReadableMetadata implements Serializable 
{
     }
 
     public void appendProducedRowWithMetadata(
-            GenericRowData producedRowData, int physicalArity, Message<?> 
message) {
+            GenericRowData producedRowData, int physicalArity, Message<?> 
message, Collector<RowData> collector) {
         for (int metadataPos = 0; metadataPos < metadataConverters.size(); 
metadataPos++) {
+            Object metadata = 
metadataConverters.get(metadataPos).read(message);
             producedRowData.setField(
-                    physicalArity + metadataPos, 
metadataConverters.get(metadataPos).read(message));
+                    physicalArity + metadataPos, metadata);
+            if 
(CONSUME_TIME.key.equals(connectorMetadataKeys.get(metadataPos)) &&
+                    collector instanceof MetricsCollector) {
+                ((MetricsCollector<RowData>) collector).resetTimestamp((Long) 
metadata);
+            }
+
         }
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
index 2f87ca1564..c66e25c225 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
@@ -122,7 +122,7 @@ public class PulsarRowDataConverter implements Serializable 
{
             producedRow.setField(keyProjection[keyPos], 
physicalKeyRow.getField(keyPos));
         }
 
-        readableMetadata.appendProducedRowWithMetadata(producedRow, 
physicalArity, message);
+        readableMetadata.appendProducedRowWithMetadata(producedRow, 
physicalArity, message, collector);
         collector.collect(producedRow);
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 554c81a33f..c05f485af6 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -57,8 +57,6 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
 
     private final boolean upsertMode;
 
-    private final boolean innerFormat;
-
     private SourceExactlyMetric sourceExactlyMetric;
 
     private MetricOption metricOption;
@@ -69,7 +67,6 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             TypeInformation<RowData> producedTypeInfo,
             PulsarRowDataConverter rowDataConverter,
             boolean upsertMode,
-            boolean innerFormat,
             MetricOption metricOption) {
         if (upsertMode) {
             checkNotNull(keyDeserialization, "upsert mode must specify a key 
format");
@@ -79,7 +76,6 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
         this.rowDataConverter = checkNotNull(rowDataConverter);
         this.producedTypeInfo = checkNotNull(producedTypeInfo);
         this.upsertMode = upsertMode;
-        this.innerFormat = innerFormat;
         this.metricOption = metricOption;
     }
 
@@ -113,17 +109,12 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
         }
 
         MetricsCollector<RowData> metricsCollector =
-                new MetricsCollector<>(new ListCollector<>(valueRowData), 
sourceExactlyMetric);
-
-        // reset timestamp if the deserialize schema has not inner format
-        if (!innerFormat) {
-            metricsCollector.resetTimestamp(System.currentTimeMillis());
-        }
+                new MetricsCollector<>(collector, sourceExactlyMetric);
 
-        valueDeserialization.deserialize(message.getData(), metricsCollector);
+        valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
 
         rowDataConverter.projectToProducedRowAndCollect(
-                message, keyRowData, valueRowData, collector);
+                message, keyRowData, valueRowData, metricsCollector);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index c193a34857..97148b47f6 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -80,8 +80,6 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
 
     private final int[] valueProjection;
 
-    private final boolean innerFormat;
-
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes. Will be updated after the applyReadableMetadata()
     // 
--------------------------------------------------------------------------------------------
@@ -103,7 +101,6 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
             DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
             int[] valueProjection,
             boolean upsertMode,
-            boolean innerFormat,
             String inlongMetric,
             String auditHostAndPorts,
             String auditKeys) {
@@ -119,8 +116,6 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
         this.producedDataType = physicalDataType;
         this.connectorMetadataKeys = Collections.emptyList();
         this.upsertMode = upsertMode;
-        this.innerFormat = innerFormat;
-
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.auditKeys = auditKeys;
@@ -182,7 +177,6 @@ public class PulsarTableDeserializationSchemaFactory 
implements Serializable {
                 producedTypeInfo,
                 rowDataConverter,
                 upsertMode,
-                innerFormat,
                 metricOption);
     }
 

Reply via email to