This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new da4e183247 [INLONG-9064][Sort] Add Audit report for Pulsar connector
in flink 1.15 (#9080)
da4e183247 is described below
commit da4e1832473e55dbb32e54ea27ccac38ca0b742e
Author: Sting <[email protected]>
AuthorDate: Sun Oct 22 14:57:38 2023 +0800
[INLONG-9064][Sort] Add Audit report for Pulsar connector in flink 1.15
(#9080)
---
.../protocol/node/extract/PulsarExtractNode.java | 1 -
.../inlong/sort/base/metric/SinkMetricData.java | 4 ++-
.../inlong/sort/base/metric/SourceMetricData.java | 23 ++++++++++++++---
.../pulsar/table/PulsarDynamicTableFactory.java | 18 +++----------
.../sort-connectors/pulsar/pom.xml | 6 +++++
.../inlong/sort/pulsar/PulsarTableFactory.java | 18 ++++++++++---
.../table/PulsarTableDeserializationSchema.java | 12 +++++++--
.../PulsarTableDeserializationSchemaFactory.java | 30 ++++++++++++++++++++--
8 files changed, 85 insertions(+), 27 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
index ead53f0e7c..cba35f8683 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java
@@ -112,7 +112,6 @@ public class PulsarExtractNode extends ExtractNode
implements InlongMetric {
if (adminUrl != null) {
options.put("admin-url", adminUrl);
}
- options.put("generic", "true");
options.put("service-url", serviceUrl);
options.put("topic", topic);
options.put("scan.startup.mode", scanStartupMode);
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index fa2aee68d3..4b48c4ece2 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -25,6 +25,7 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -41,8 +42,9 @@ import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS
/**
* A collection class for handling metrics
*/
-public class SinkMetricData implements MetricData {
+public class SinkMetricData implements MetricData, Serializable {
+ private static final long serialVersionUID = 1L;
private final MetricGroup metricGroup;
private final Map<String, String> labels;
private final RegisteredMetric registeredMetric;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index ec64f53b39..0a0cad5d6e 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.SimpleCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -43,10 +44,11 @@ import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataS
/**
* A collection class for handling metrics
*/
-public class SourceMetricData implements MetricData {
+public class SourceMetricData implements MetricData, Serializable {
+ private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(SourceMetricData.class);
- private final MetricGroup metricGroup;
+ private MetricGroup metricGroup;
private final Map<String, String> labels;
private Counter numRecordsIn;
private Counter numBytesIn;
@@ -108,6 +110,16 @@ public class SourceMetricData implements MetricData {
}
}
+ public SourceMetricData(MetricOption option) {
+ this.labels = option.getLabels();
+
+ if (option.getIpPorts().isPresent()) {
+ AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
+ this.auditOperator = AuditOperator.getInstance();
+ this.auditKeys = option.getInlongAuditKeys();
+ }
+ }
+
/**
* Default counter is {@link SimpleCounter}
* groupId and streamId and nodeId are label value, user can use it filter
metric data when use metric reporter
@@ -288,20 +300,23 @@ public class SourceMetricData implements MetricData {
public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime) {
outputDefaultMetrics(rowCountSize, rowDataSize);
-
if (auditOperator != null) {
for (Integer key : auditKeys) {
auditOperator.add(
key,
getGroupId(),
getStreamId(),
- dataTime,
+ getCurrentOrProvidedTime(dataTime),
rowCountSize,
rowDataSize);
}
}
}
+ private long getCurrentOrProvidedTime(long dataTime) {
+ return dataTime == 0 ? System.currentTimeMillis() : dataTime;
+ }
+
private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
if (numRecordsIn != null) {
this.numRecordsIn.inc(rowCountSize);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 79af4e0d43..dd06bfe758 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -39,7 +39,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
-import org.apache.pulsar.common.naming.TopicName;
import javax.annotation.Nullable;
@@ -163,7 +162,7 @@ public class PulsarDynamicTableFactory
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();
- List<String> topics = generateTopic(context.getObjectIdentifier(),
tableOptions);
+ List<String> topics = generateTopic(tableOptions);
if (topics != null && !topics.isEmpty()) {
((Configuration) tableOptions).set(TOPIC,
Collections.singletonList(topics.get(0)));
}
@@ -231,7 +230,7 @@ public class PulsarDynamicTableFactory
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();
- List<String> topics = generateTopic(context.getObjectIdentifier(),
tableOptions);
+ List<String> topics = generateTopic(tableOptions);
if (topics != null && !topics.isEmpty()) {
((Configuration) tableOptions).set(TOPIC,
Collections.singletonList(topics.get(0)));
}
@@ -338,17 +337,8 @@ public class PulsarDynamicTableFactory
return options;
}
- private List<String> generateTopic(ObjectIdentifier table, ReadableConfig
tableOptions) {
- List<String> topics = null;
- if (tableOptions.get(GENERIC)) {
- topics = tableOptions.getOptional(TOPIC).orElse(null);
- } else {
- String rawTopic = table.getDatabaseName() + "/" +
table.getObjectName();
- final String topic = TopicName.get(rawTopic).toString();
- topics = Collections.singletonList(topic);
- }
-
- return topics;
+ private List<String> generateTopic(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(TOPIC).orElse(null);
}
//
--------------------------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
index 501d39c637..9fc0949b46 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml
@@ -38,6 +38,12 @@
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 4adc2e64ba..6784fc6790 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -47,6 +47,9 @@ import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULS
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createKeyFormatProjection;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.createValueFormatProjection;
import static
org.apache.inlong.sort.pulsar.PulsarTableOptionUtils.getKeyDecodingFormat;
@@ -137,6 +140,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+ String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+ String auditKeys = tableOptions.get(AUDIT_KEYS);
+
final PulsarTableDeserializationSchemaFactory
deserializationSchemaFactory =
new PulsarTableDeserializationSchemaFactory(
physicalDataType,
@@ -144,7 +151,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
keyProjection,
valueDecodingFormat,
valueProjection,
- UPSERT_DISABLED);
+ UPSERT_DISABLED,
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
// Set default values for configuration not exposed to user.
final DecodingFormat<DeserializationSchema<RowData>>
decodingFormatForMetadataPushdown =
@@ -190,8 +200,10 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
SINK_PARALLELISM,
KEY_FORMAT,
KEY_FIELDS,
- EXPLICIT)
- .collect(Collectors.toSet());
+ EXPLICIT,
+ AUDIT_KEYS,
+ INLONG_METRIC,
+ INLONG_AUDIT).collect(Collectors.toSet());
}
/** Format and Delivery guarantee related options are not forward options.
*/
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 8377fce389..237de45b4d 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -53,12 +56,15 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
private final boolean upsertMode;
+ private SourceMetricData sourceMetricData;
+
public PulsarTableDeserializationSchema(
@Nullable DeserializationSchema<RowData> keyDeserialization,
DeserializationSchema<RowData> valueDeserialization,
TypeInformation<RowData> producedTypeInfo,
PulsarRowDataConverter rowDataConverter,
- boolean upsertMode) {
+ boolean upsertMode,
+ SourceMetricData sourceMetricData) {
if (upsertMode) {
checkNotNull(keyDeserialization, "upsert mode must specify a key
format");
}
@@ -67,6 +73,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
this.rowDataConverter = checkNotNull(rowDataConverter);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.upsertMode = upsertMode;
+ this.sourceMetricData = sourceMetricData;
}
@Override
@@ -96,7 +103,8 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
return;
}
- valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
+ valueDeserialization.deserialize(message.getData(),
+ new MetricsCollector<>(new ListCollector<>(valueRowData),
sourceMetricData));
rowDataConverter.projectToProducedRowAndCollect(
message, keyRowData, valueRowData, collector);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index 671e82e0d3..360b07aa69 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
@@ -86,13 +89,21 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
private final boolean upsertMode;
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+ private SourceMetricData sourceMetricData;
+
public PulsarTableDeserializationSchemaFactory(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
int[] keyProjection,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] valueProjection,
- boolean upsertMode) {
+ boolean upsertMode,
+ String inlongMetric,
+ String auditHostAndPorts,
+ String auditKeys) {
this.physicalDataType =
checkNotNull(physicalDataType, "field physicalDataType must
not be null.");
this.keyDecodingFormat = keyDecodingFormat;
@@ -105,6 +116,10 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
this.producedDataType = physicalDataType;
this.connectorMetadataKeys = Collections.emptyList();
this.upsertMode = upsertMode;
+
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
}
private @Nullable DeserializationSchema<RowData> createDeserialization(
@@ -151,12 +166,23 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
readableMetadata,
upsertMode);
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
+ if (metricOption != null) {
+ sourceMetricData = new SourceMetricData(metricOption);
+ }
+
return new PulsarTableDeserializationSchema(
keyDeserialization,
valueDeserialization,
producedTypeInfo,
rowDataConverter,
- upsertMode);
+ upsertMode,
+ sourceMetricData);
}
public void setProducedDataType(DataType producedDataType) {