This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 49900eb96 [INLONG-7609][Sort] Add audit for Kafka source connector 
(#7610)
49900eb96 is described below

commit 49900eb9661a384b2ef362355b937fab3881640f
Author: Schnapps <[email protected]>
AuthorDate: Wed Mar 15 14:41:20 2023 +0800

    [INLONG-7609][Sort] Add audit for Kafka source connector (#7610)
---
 inlong-sort/sort-connectors/base/pom.xml           |  6 +++++
 .../inlong/sort/base/metric}/MetricsCollector.java | 10 +++++---
 .../inlong/sort/kafka/FlinkKafkaConsumer.java      | 30 ++++++++++++----------
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  |  6 ++++-
 .../table/DynamicKafkaDeserializationSchema.java   | 21 +++------------
 .../sort/kafka/table/KafkaDynamicSource.java       | 18 ++++++++-----
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 12 ++++++---
 .../table/UpsertKafkaDynamicTableFactory.java      |  5 +++-
 inlong-sort/sort-connectors/pulsar/pom.xml         |  6 -----
 .../table/DynamicPulsarDeserializationSchema.java  |  2 +-
 10 files changed, 63 insertions(+), 53 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/pom.xml 
b/inlong-sort/sort-connectors/base/pom.xml
index 3942a5887..f0c3b2e65 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -43,6 +43,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-s3</artifactId>
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
similarity index 87%
rename from 
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
rename to 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
index 947337dbe..629dfec23 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.base.metric;
 
 import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
 
 /**
@@ -27,7 +26,7 @@ import 
org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
  */
 public class MetricsCollector<T> implements TimestampedCollector<T> {
 
-    private Collector<T> collector;
+    private final Collector<T> collector;
 
     private long timestampMillis;
 
@@ -45,11 +44,14 @@ public class MetricsCollector<T> implements 
TimestampedCollector<T> {
 
     @Override
     public void collect(T record) {
-        metricData.outputMetricsWithEstimate(record, timestampMillis);
+        if (metricData != null) {
+            metricData.outputMetricsWithEstimate(record, timestampMillis);
+        }
         collector.collect(record);
     }
 
     @Override
     public void close() {
     }
+
 }
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
index fb51985ae..f0c453796 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
@@ -111,8 +111,8 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
      */
     public FlinkKafkaConsumer(
             String topic, DeserializationSchema<T> valueDeserializer, 
Properties props, String inlongMetric,
-            String auditHostAndPorts) {
-        this(Collections.singletonList(topic), valueDeserializer, props, 
inlongMetric, auditHostAndPorts);
+            String auditHostAndPorts, String auditKeys) {
+        this(Collections.singletonList(topic), valueDeserializer, props, 
inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     /**
@@ -128,8 +128,8 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
      */
     public FlinkKafkaConsumer(
             String topic, KafkaDeserializationSchema<T> deserializer, 
Properties props, String inlongMetric,
-            String auditHostAndPorts) {
-        this(Collections.singletonList(topic), deserializer, props, 
inlongMetric, auditHostAndPorts);
+            String auditHostAndPorts, String auditKeys) {
+        this(Collections.singletonList(topic), deserializer, props, 
inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     /**
@@ -144,8 +144,9 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
      */
     public FlinkKafkaConsumer(
             List<String> topics, DeserializationSchema<T> deserializer, 
Properties props, String inlongMetric,
-            String auditHostAndPorts) {
-        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), 
props, inlongMetric, auditHostAndPorts);
+            String auditHostAndPorts, String auditKeys) {
+        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), 
props,
+                inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     /**
@@ -160,8 +161,8 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
      */
     public FlinkKafkaConsumer(
             List<String> topics, KafkaDeserializationSchema<T> deserializer, 
Properties props, String inlongMetric,
-            String auditHostAndPorts) {
-        this(topics, null, deserializer, props, inlongMetric, 
auditHostAndPorts);
+            String auditHostAndPorts, String auditKeys) {
+        this(topics, null, deserializer, props, inlongMetric, 
auditHostAndPorts, auditKeys);
     }
 
     /**
@@ -180,12 +181,12 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
      */
     public FlinkKafkaConsumer(
             Pattern subscriptionPattern, DeserializationSchema<T> 
valueDeserializer,
-            Properties props, String inlongMetric, String auditHostAndPorts) {
+            Properties props, String inlongMetric, String auditHostAndPorts, 
String auditKeys) {
         this(
                 null,
                 subscriptionPattern,
                 new KafkaDeserializationSchemaWrapper<>(valueDeserializer),
-                props, inlongMetric, auditHostAndPorts);
+                props, inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     /**
@@ -208,8 +209,8 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
     public FlinkKafkaConsumer(
             Pattern subscriptionPattern,
             KafkaDeserializationSchema<T> deserializer,
-            Properties props, String inlongMetric, String auditHostAndPorts) {
-        this(null, subscriptionPattern, deserializer, props, inlongMetric, 
auditHostAndPorts);
+            Properties props, String inlongMetric, String auditHostAndPorts, 
String auditKeys) {
+        this(null, subscriptionPattern, deserializer, props, inlongMetric, 
auditHostAndPorts, auditKeys);
     }
 
     private FlinkKafkaConsumer(
@@ -217,7 +218,7 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
             Pattern subscriptionPattern,
             KafkaDeserializationSchema<T> deserializer,
             Properties props, String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts, String auditKeys) {
 
         super(
                 topics,
@@ -227,7 +228,8 @@ public class FlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
                         checkNotNull(props, "props"),
                         KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
                         PARTITION_DISCOVERY_DISABLED),
-                !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric, 
auditHostAndPorts);
+                !getBoolean(props, KEY_DISABLE_METRICS, false),
+                inlongMetric, auditHostAndPorts, auditKeys);
 
         this.properties = props;
         setDeserializer(this.properties);
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index b3aa4dbbb..93f0661f7 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -294,6 +294,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
 
     private SourceMetricData sourceMetricData;
 
+    private String auditKeys;
+
     // ------------------------------------------------------------------------
 
     /**
@@ -310,7 +312,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
             Pattern topicPattern,
             KafkaDeserializationSchema<T> deserializer,
             long discoveryIntervalMillis,
-            boolean useMetrics, String inlongMetric, String auditHostAndPorts) 
{
+            boolean useMetrics, String inlongMetric, String auditHostAndPorts, 
String auditKeys) {
         this.topicsDescriptor = new KafkaTopicsDescriptor(topics, 
topicPattern);
         this.deserializer = checkNotNull(deserializer, "valueDeserializer");
 
@@ -323,6 +325,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
         this.useMetrics = useMetrics;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     /**
@@ -833,6 +836,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+                .withAuditKeys(auditKeys)
                 .build();
         if (metricOption != null) {
             sourceMetricData = new SourceMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 5d74ff3d1..928ed155d 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
@@ -134,13 +135,10 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
             throws Exception {
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
+
         if (keyDeserialization == null && !hasMetadata) {
             deserializeWithDirtyHandle(record.value(), 
DirtyType.VALUE_DESERIALIZE_ERROR,
-                    valueDeserialization, collector);
-            // output metrics
-            if (metricData != null) {
-                outputMetrics(record);
-            }
+                    valueDeserialization, new MetricsCollector<>(collector, 
metricData));
             return;
         }
 
@@ -160,11 +158,7 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
             outputCollector.collect(null);
         } else {
             deserializeWithDirtyHandle(record.value(), 
DirtyType.VALUE_DESERIALIZE_ERROR,
-                    valueDeserialization, outputCollector);
-            // output metrics
-            if (metricData != null) {
-                outputMetrics(record);
-            }
+                    valueDeserialization, new 
MetricsCollector<>(outputCollector, metricData));
         }
         keyCollector.buffer.clear();
     }
@@ -199,13 +193,6 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
         }
     }
 
-    private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
-        long dataSize = record.value() == null ? 0L : record.value().length;
-        if (metricData != null) {
-            metricData.outputMetrics(1L, dataSize);
-        }
-    }
-
     @Override
     public TypeInformation<RowData> getProducedType() {
         return producedTypeInfo;
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 29028b356..a1d7c8f27 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -176,6 +176,8 @@ public class KafkaDynamicSource
 
     protected final String inlongMetric;
 
+    protected final String auditKeys;
+
     protected final String auditHostAndPorts;
 
     private final DirtyOptions dirtyOptions;
@@ -202,7 +204,8 @@ public class KafkaDynamicSource
             final String inlongMetric,
             final String auditHostAndPorts,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<String> dirtySink) {
+            @Nullable DirtySink<String> dirtySink,
+            final String auditKeys) {
         // Format attributes
         this.physicalDataType =
                 Preconditions.checkNotNull(
@@ -239,6 +242,7 @@ public class KafkaDynamicSource
         this.auditHostAndPorts = auditHostAndPorts;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.auditKeys = auditKeys;
     }
 
     @Override
@@ -259,7 +263,7 @@ public class KafkaDynamicSource
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer =
                 createKafkaConsumer(keyDeserialization, valueDeserialization,
-                        producedTypeInfo, inlongMetric, auditHostAndPorts);
+                        producedTypeInfo, inlongMetric, auditHostAndPorts, 
auditKeys);
 
         return SourceFunctionProvider.of(kafkaConsumer, false);
     }
@@ -333,7 +337,8 @@ public class KafkaDynamicSource
                         inlongMetric,
                         auditHostAndPorts,
                         dirtyOptions,
-                        dirtySink);
+                        dirtySink,
+                        auditKeys);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
@@ -400,7 +405,8 @@ public class KafkaDynamicSource
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            String auditKeys) {
 
         final MetadataConverter[] metadataConverters =
                 metadataKeys.stream()
@@ -445,10 +451,10 @@ public class KafkaDynamicSource
         final FlinkKafkaConsumer<RowData> kafkaConsumer;
         if (topics != null) {
             kafkaConsumer = new FlinkKafkaConsumer<>(topics, 
kafkaDeserializer, properties, inlongMetric,
-                    auditHostAndPorts);
+                    auditHostAndPorts, auditKeys);
         } else {
             kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, 
kafkaDeserializer, properties, inlongMetric,
-                    auditHostAndPorts);
+                    auditHostAndPorts, auditKeys);
         }
 
         switch (startupMode) {
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index f2d48e95a..f4014bb3e 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -363,6 +363,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
 
         final String auditHostAndPorts = 
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
+        final String auditKeys = 
tableOptions.getOptional(AUDIT_KEYS).orElse(null);
+
         // Build the dirty data side-output
         final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
         final DirtySink<String> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
@@ -382,7 +385,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 inlongMetric,
                 auditHostAndPorts,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                auditKeys);
     }
 
     @Override
@@ -493,7 +497,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             String inlongMetric,
             String auditHostAndPorts,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<String> dirtySink) {
+            @Nullable DirtySink<String> dirtySink,
+            String auditKeys) {
         return new KafkaDynamicSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -511,7 +516,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 inlongMetric,
                 auditHostAndPorts,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                auditKeys);
     }
 
     protected KafkaDynamicSink createKafkaTableSink(
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 73507aa6c..c0d228001 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -69,6 +69,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.aut
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -219,6 +220,7 @@ public class UpsertKafkaDynamicTableFactory
         final DirtySink<String> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         final String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         final String auditHostAndPorts = 
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+        final String auditKeys = 
tableOptions.getOptional(AUDIT_KEYS).orElse(null);
         return new KafkaDynamicSource(
                 schema.toPhysicalRowDataType(),
                 keyDecodingFormat,
@@ -236,7 +238,8 @@ public class UpsertKafkaDynamicTableFactory
                 inlongMetric,
                 auditHostAndPorts,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                auditKeys);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-connectors/pulsar/pom.xml
index 5d5e3da54..c7b493def 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -62,12 +62,6 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-format-base</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index f3c71e029..31a5f57be 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -29,8 +29,8 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.pulsar.withoutadmin.MetricsCollector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 

Reply via email to