This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 49900eb96 [INLONG-7609][Sort] Add audit for Kafka source connector
(#7610)
49900eb96 is described below
commit 49900eb9661a384b2ef362355b937fab3881640f
Author: Schnapps <[email protected]>
AuthorDate: Wed Mar 15 14:41:20 2023 +0800
[INLONG-7609][Sort] Add audit for Kafka source connector (#7610)
---
inlong-sort/sort-connectors/base/pom.xml | 6 +++++
.../inlong/sort/base/metric}/MetricsCollector.java | 10 +++++---
.../inlong/sort/kafka/FlinkKafkaConsumer.java | 30 ++++++++++++----------
.../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 6 ++++-
.../table/DynamicKafkaDeserializationSchema.java | 21 +++------------
.../sort/kafka/table/KafkaDynamicSource.java | 18 ++++++++-----
.../sort/kafka/table/KafkaDynamicTableFactory.java | 12 ++++++---
.../table/UpsertKafkaDynamicTableFactory.java | 5 +++-
inlong-sort/sort-connectors/pulsar/pom.xml | 6 -----
.../table/DynamicPulsarDeserializationSchema.java | 2 +-
10 files changed, 63 insertions(+), 53 deletions(-)
diff --git a/inlong-sort/sort-connectors/base/pom.xml
b/inlong-sort/sort-connectors/base/pom.xml
index 3942a5887..f0c3b2e65 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -43,6 +43,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-format-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
similarity index 87%
rename from
inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
rename to
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
index 947337dbe..629dfec23 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricsCollector.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.pulsar.withoutadmin;
+package org.apache.inlong.sort.base.metric;
import org.apache.flink.util.Collector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
/**
@@ -27,7 +26,7 @@ import
org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
*/
public class MetricsCollector<T> implements TimestampedCollector<T> {
- private Collector<T> collector;
+ private final Collector<T> collector;
private long timestampMillis;
@@ -45,11 +44,14 @@ public class MetricsCollector<T> implements
TimestampedCollector<T> {
@Override
public void collect(T record) {
- metricData.outputMetricsWithEstimate(record, timestampMillis);
+ if (metricData != null) {
+ metricData.outputMetricsWithEstimate(record, timestampMillis);
+ }
collector.collect(record);
}
@Override
public void close() {
}
+
}
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
index fb51985ae..f0c453796 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
@@ -111,8 +111,8 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
*/
public FlinkKafkaConsumer(
String topic, DeserializationSchema<T> valueDeserializer,
Properties props, String inlongMetric,
- String auditHostAndPorts) {
- this(Collections.singletonList(topic), valueDeserializer, props,
inlongMetric, auditHostAndPorts);
+ String auditHostAndPorts, String auditKeys) {
+ this(Collections.singletonList(topic), valueDeserializer, props,
inlongMetric, auditHostAndPorts, auditKeys);
}
/**
@@ -128,8 +128,8 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
*/
public FlinkKafkaConsumer(
String topic, KafkaDeserializationSchema<T> deserializer,
Properties props, String inlongMetric,
- String auditHostAndPorts) {
- this(Collections.singletonList(topic), deserializer, props,
inlongMetric, auditHostAndPorts);
+ String auditHostAndPorts, String auditKeys) {
+ this(Collections.singletonList(topic), deserializer, props,
inlongMetric, auditHostAndPorts, auditKeys);
}
/**
@@ -144,8 +144,9 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
*/
public FlinkKafkaConsumer(
List<String> topics, DeserializationSchema<T> deserializer,
Properties props, String inlongMetric,
- String auditHostAndPorts) {
- this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer),
props, inlongMetric, auditHostAndPorts);
+ String auditHostAndPorts, String auditKeys) {
+ this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer),
props,
+ inlongMetric, auditHostAndPorts, auditKeys);
}
/**
@@ -160,8 +161,8 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
*/
public FlinkKafkaConsumer(
List<String> topics, KafkaDeserializationSchema<T> deserializer,
Properties props, String inlongMetric,
- String auditHostAndPorts) {
- this(topics, null, deserializer, props, inlongMetric,
auditHostAndPorts);
+ String auditHostAndPorts, String auditKeys) {
+ this(topics, null, deserializer, props, inlongMetric,
auditHostAndPorts, auditKeys);
}
/**
@@ -180,12 +181,12 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
*/
public FlinkKafkaConsumer(
Pattern subscriptionPattern, DeserializationSchema<T>
valueDeserializer,
- Properties props, String inlongMetric, String auditHostAndPorts) {
+ Properties props, String inlongMetric, String auditHostAndPorts,
String auditKeys) {
this(
null,
subscriptionPattern,
new KafkaDeserializationSchemaWrapper<>(valueDeserializer),
- props, inlongMetric, auditHostAndPorts);
+ props, inlongMetric, auditHostAndPorts, auditKeys);
}
/**
@@ -208,8 +209,8 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
public FlinkKafkaConsumer(
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
- Properties props, String inlongMetric, String auditHostAndPorts) {
- this(null, subscriptionPattern, deserializer, props, inlongMetric,
auditHostAndPorts);
+ Properties props, String inlongMetric, String auditHostAndPorts,
String auditKeys) {
+ this(null, subscriptionPattern, deserializer, props, inlongMetric,
auditHostAndPorts, auditKeys);
}
private FlinkKafkaConsumer(
@@ -217,7 +218,7 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props, String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts, String auditKeys) {
super(
topics,
@@ -227,7 +228,8 @@ public class FlinkKafkaConsumer<T> extends
FlinkKafkaConsumerBase<T> {
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
PARTITION_DISCOVERY_DISABLED),
- !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric,
auditHostAndPorts);
+ !getBoolean(props, KEY_DISABLE_METRICS, false),
+ inlongMetric, auditHostAndPorts, auditKeys);
this.properties = props;
setDeserializer(this.properties);
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index b3aa4dbbb..93f0661f7 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -294,6 +294,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
private SourceMetricData sourceMetricData;
+ private String auditKeys;
+
// ------------------------------------------------------------------------
/**
@@ -310,7 +312,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
Pattern topicPattern,
KafkaDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
- boolean useMetrics, String inlongMetric, String auditHostAndPorts)
{
+ boolean useMetrics, String inlongMetric, String auditHostAndPorts,
String auditKeys) {
this.topicsDescriptor = new KafkaTopicsDescriptor(topics,
topicPattern);
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
@@ -323,6 +325,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
this.useMetrics = useMetrics;
this.inlongMetric = inlongMetric;
this.inlongAudit = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
/**
@@ -833,6 +836,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends
RichParallelSourceFuncti
.withRegisterMetric(RegisteredMetric.ALL)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
+ .withAuditKeys(auditKeys)
.build();
if (metricOption != null) {
sourceMetricData = new SourceMetricData(metricOption,
getRuntimeContext().getMetricGroup());
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 5d74ff3d1..928ed155d 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -32,6 +32,7 @@ import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
@@ -134,13 +135,10 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
throws Exception {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
+
if (keyDeserialization == null && !hasMetadata) {
deserializeWithDirtyHandle(record.value(),
DirtyType.VALUE_DESERIALIZE_ERROR,
- valueDeserialization, collector);
- // output metrics
- if (metricData != null) {
- outputMetrics(record);
- }
+ valueDeserialization, new MetricsCollector<>(collector,
metricData));
return;
}
@@ -160,11 +158,7 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
outputCollector.collect(null);
} else {
deserializeWithDirtyHandle(record.value(),
DirtyType.VALUE_DESERIALIZE_ERROR,
- valueDeserialization, outputCollector);
- // output metrics
- if (metricData != null) {
- outputMetrics(record);
- }
+ valueDeserialization, new
MetricsCollector<>(outputCollector, metricData));
}
keyCollector.buffer.clear();
}
@@ -199,13 +193,6 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
}
}
- private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
- long dataSize = record.value() == null ? 0L : record.value().length;
- if (metricData != null) {
- metricData.outputMetrics(1L, dataSize);
- }
- }
-
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 29028b356..a1d7c8f27 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -176,6 +176,8 @@ public class KafkaDynamicSource
protected final String inlongMetric;
+ protected final String auditKeys;
+
protected final String auditHostAndPorts;
private final DirtyOptions dirtyOptions;
@@ -202,7 +204,8 @@ public class KafkaDynamicSource
final String inlongMetric,
final String auditHostAndPorts,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<String> dirtySink) {
+ @Nullable DirtySink<String> dirtySink,
+ final String auditKeys) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
@@ -239,6 +242,7 @@ public class KafkaDynamicSource
this.auditHostAndPorts = auditHostAndPorts;
this.dirtyOptions = dirtyOptions;
this.dirtySink = dirtySink;
+ this.auditKeys = auditKeys;
}
@Override
@@ -259,7 +263,7 @@ public class KafkaDynamicSource
final FlinkKafkaConsumer<RowData> kafkaConsumer =
createKafkaConsumer(keyDeserialization, valueDeserialization,
- producedTypeInfo, inlongMetric, auditHostAndPorts);
+ producedTypeInfo, inlongMetric, auditHostAndPorts,
auditKeys);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
@@ -333,7 +337,8 @@ public class KafkaDynamicSource
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ auditKeys);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
@@ -400,7 +405,8 @@ public class KafkaDynamicSource
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ String auditKeys) {
final MetadataConverter[] metadataConverters =
metadataKeys.stream()
@@ -445,10 +451,10 @@ public class KafkaDynamicSource
final FlinkKafkaConsumer<RowData> kafkaConsumer;
if (topics != null) {
kafkaConsumer = new FlinkKafkaConsumer<>(topics,
kafkaDeserializer, properties, inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts, auditKeys);
} else {
kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern,
kafkaDeserializer, properties, inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts, auditKeys);
}
switch (startupMode) {
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index f2d48e95a..f4014bb3e 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -363,6 +363,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
+ final String auditKeys =
tableOptions.getOptional(AUDIT_KEYS).orElse(null);
+
// Build the dirty data side-output
final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
final DirtySink<String> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
@@ -382,7 +385,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ auditKeys);
}
@Override
@@ -493,7 +497,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
String inlongMetric,
String auditHostAndPorts,
DirtyOptions dirtyOptions,
- @Nullable DirtySink<String> dirtySink) {
+ @Nullable DirtySink<String> dirtySink,
+ String auditKeys) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
@@ -511,7 +516,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ auditKeys);
}
protected KafkaDynamicSink createKafkaTableSink(
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 73507aa6c..c0d228001 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -69,6 +69,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.aut
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -219,6 +220,7 @@ public class UpsertKafkaDynamicTableFactory
final DirtySink<String> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
final String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+ final String auditKeys =
tableOptions.getOptional(AUDIT_KEYS).orElse(null);
return new KafkaDynamicSource(
schema.toPhysicalRowDataType(),
keyDecodingFormat,
@@ -236,7 +238,8 @@ public class UpsertKafkaDynamicTableFactory
inlongMetric,
auditHostAndPorts,
dirtyOptions,
- dirtySink);
+ dirtySink,
+ auditKeys);
}
@Override
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-connectors/pulsar/pom.xml
index 5d5e3da54..c7b493def 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -62,12 +62,6 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>sort-format-base</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index f3c71e029..31a5f57be 100644
---
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -29,8 +29,8 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.pulsar.withoutadmin.MetricsCollector;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;