This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7e2897c983 [INLONG-10573][Sort] Make pulsar source connector report
audit attach input time (#10574)
7e2897c983 is described below
commit 7e2897c983ad61d656e6ab83b2844add820c1b5f
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 9 17:00:42 2024 +0800
[INLONG-10573][Sort] Make pulsar source connector report audit attach input
time (#10574)
---
.../org/apache/inlong/sort/pulsar/PulsarTableFactory.java | 6 ------
.../inlong/sort/pulsar/table/PulsarReadableMetadata.java | 15 +++++++++++++--
.../inlong/sort/pulsar/table/PulsarRowDataConverter.java | 2 +-
.../pulsar/table/PulsarTableDeserializationSchema.java | 15 +++------------
.../table/PulsarTableDeserializationSchemaFactory.java | 6 ------
5 files changed, 17 insertions(+), 27 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
index 80ef5b6363..780dfde8b5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/PulsarTableFactory.java
@@ -47,7 +47,6 @@ import java.util.stream.Stream;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
@@ -95,8 +94,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
public static final boolean UPSERT_DISABLED = false;
- public static boolean innerFormat = false;
-
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
@@ -107,8 +104,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
getValueDecodingFormat(helper);
ReadableConfig tableOptions = helper.getOptions();
- innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
-
// Validate configs are not conflict; each options is consumed; no
unwanted configs
// PulsarOptions, PulsarSourceOptions and PulsarSinkOptions is not
part of the validation.
helper.validateExcept(
@@ -160,7 +155,6 @@ public class PulsarTableFactory implements
DynamicTableSourceFactory {
valueDecodingFormat,
valueProjection,
UPSERT_DISABLED,
- innerFormat,
inlongMetric,
auditHostAndPorts,
auditKeys);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
index 6740e6111f..5a40bb1896 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarReadableMetadata.java
@@ -17,14 +17,17 @@
package org.apache.inlong.sort.pulsar.table;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
import org.apache.pulsar.client.api.Message;
import java.io.Serializable;
@@ -34,6 +37,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.inlong.sort.pulsar.table.PulsarReadableMetadata.ReadableMetadata.CONSUME_TIME;
+
/**
* Class for reading metadata fields from a Pulsar message and put in
corresponding Flink row
* fields.
@@ -66,10 +71,16 @@ public class PulsarReadableMetadata implements Serializable
{
}
public void appendProducedRowWithMetadata(
- GenericRowData producedRowData, int physicalArity, Message<?>
message) {
+ GenericRowData producedRowData, int physicalArity, Message<?>
message, Collector<RowData> collector) {
for (int metadataPos = 0; metadataPos < metadataConverters.size();
metadataPos++) {
+ Object metadata =
metadataConverters.get(metadataPos).read(message);
producedRowData.setField(
- physicalArity + metadataPos,
metadataConverters.get(metadataPos).read(message));
+ physicalArity + metadataPos, metadata);
+ if
(CONSUME_TIME.key.equals(connectorMetadataKeys.get(metadataPos)) &&
+ collector instanceof MetricsCollector) {
+ ((MetricsCollector<RowData>) collector).resetTimestamp((Long)
metadata);
+ }
+
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
index 2f87ca1564..c66e25c225 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
@@ -122,7 +122,7 @@ public class PulsarRowDataConverter implements Serializable
{
producedRow.setField(keyProjection[keyPos],
physicalKeyRow.getField(keyPos));
}
- readableMetadata.appendProducedRowWithMetadata(producedRow,
physicalArity, message);
+ readableMetadata.appendProducedRowWithMetadata(producedRow,
physicalArity, message, collector);
collector.collect(producedRow);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index 554c81a33f..c05f485af6 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -57,8 +57,6 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
private final boolean upsertMode;
- private final boolean innerFormat;
-
private SourceExactlyMetric sourceExactlyMetric;
private MetricOption metricOption;
@@ -69,7 +67,6 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
TypeInformation<RowData> producedTypeInfo,
PulsarRowDataConverter rowDataConverter,
boolean upsertMode,
- boolean innerFormat,
MetricOption metricOption) {
if (upsertMode) {
checkNotNull(keyDeserialization, "upsert mode must specify a key
format");
@@ -79,7 +76,6 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
this.rowDataConverter = checkNotNull(rowDataConverter);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.upsertMode = upsertMode;
- this.innerFormat = innerFormat;
this.metricOption = metricOption;
}
@@ -113,17 +109,12 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
}
MetricsCollector<RowData> metricsCollector =
- new MetricsCollector<>(new ListCollector<>(valueRowData),
sourceExactlyMetric);
-
- // reset timestamp if the deserialize schema has not inner format
- if (!innerFormat) {
- metricsCollector.resetTimestamp(System.currentTimeMillis());
- }
+ new MetricsCollector<>(collector, sourceExactlyMetric);
- valueDeserialization.deserialize(message.getData(), metricsCollector);
+ valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
rowDataConverter.projectToProducedRowAndCollect(
- message, keyRowData, valueRowData, collector);
+ message, keyRowData, valueRowData, metricsCollector);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
index c193a34857..97148b47f6 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java
@@ -80,8 +80,6 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
private final int[] valueProjection;
- private final boolean innerFormat;
-
//
--------------------------------------------------------------------------------------------
// Mutable attributes. Will be updated after the applyReadableMetadata()
//
--------------------------------------------------------------------------------------------
@@ -103,7 +101,6 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] valueProjection,
boolean upsertMode,
- boolean innerFormat,
String inlongMetric,
String auditHostAndPorts,
String auditKeys) {
@@ -119,8 +116,6 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
this.producedDataType = physicalDataType;
this.connectorMetadataKeys = Collections.emptyList();
this.upsertMode = upsertMode;
- this.innerFormat = innerFormat;
-
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.auditKeys = auditKeys;
@@ -182,7 +177,6 @@ public class PulsarTableDeserializationSchemaFactory
implements Serializable {
producedTypeInfo,
rowDataConverter,
upsertMode,
- innerFormat,
metricOption);
}