This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0e0fa5bf5 [INLONG-6925][Sort] Add read phase metric and table level
metric for MongoDB-CDC (#7069)
0e0fa5bf5 is described below
commit 0e0fa5bf5185d1b5ac2b1feb78fd82d4a872acfc
Author: kuansix <[email protected]>
AuthorDate: Tue Dec 27 12:32:22 2022 +0800
[INLONG-6925][Sort] Add read phase metric and table level metric for
MongoDB-CDC (#7069)
---
.../org/apache/inlong/sort/base/Constants.java | 4 +++
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 38 +++++++++++++++++++---
.../inlong/sort/cdc/mongodb/MongoDBSource.java | 9 ++++-
.../MongoDBConnectorDeserializationSchema.java | 8 ++---
.../cdc/mongodb/debezium/utils/RecordUtils.java | 1 +
.../sort/cdc/mongodb/table/MongoDBTableSource.java | 1 +
6 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 10fc5ea43..7af393fe7 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -84,6 +84,10 @@ public final class Constants {
* Table Name used in inlong metric
*/
public static final String TABLE_NAME = "table";
+ /**
+ * Collection Name used in inlong metric
+ */
+ public static final String COLLECTION_NAME = "collection";
/**
* Read Phase used in inlong metric
*/
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index d79188e99..9ef76410f 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sort.cdc.mongodb;
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.debezium.Validator;
+import io.debezium.connector.SnapshotRecord;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -47,10 +49,12 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import
org.apache.inlong.sort.cdc.mongodb.debezium.DebeziumDeserializationSchema;
import
org.apache.inlong.sort.cdc.mongodb.debezium.internal.DebeziumChangeConsumer;
@@ -63,6 +67,8 @@ import
org.apache.inlong.sort.cdc.mongodb.debezium.internal.FlinkOffsetBackingSt
import org.apache.inlong.sort.cdc.mongodb.debezium.internal.Handover;
import org.apache.inlong.sort.cdc.mongodb.debezium.internal.SchemaRecord;
import org.apache.inlong.sort.cdc.mongodb.debezium.utils.DatabaseHistoryUtil;
+import org.apache.inlong.sort.cdc.mongodb.debezium.utils.RecordUtils;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +77,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
@@ -231,7 +238,9 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private String inlongAudit;
- private SourceMetricData sourceMetricData;
+ private SourceTableMetricData sourceMetricData;
+
+ private boolean migrateAll;
private transient ListState<MetricState> metricStateListState;
@@ -243,13 +252,15 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
- Validator validator, String inlongMetric, String inlongAudit) {
+ Validator validator, String inlongMetric,
+ String inlongAudit, boolean migrateAll) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.migrateAll = migrateAll;
}
@Override
@@ -445,7 +456,12 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+ sourceMetricData = new SourceTableMetricData(metricOption,
metricGroup,
+ Arrays.asList(Constants.DATABASE_NAME,
Constants.COLLECTION_NAME));
+ if (migrateAll) {
+ // register sub source metric data from metric state
+ sourceMetricData.registerSubMetricsGroup(metricState);
+ }
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage",
FlinkOffsetBackingStore.class.getCanonicalName());
@@ -485,7 +501,21 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
- if (sourceMetricData != null) {
+ if (sourceMetricData != null && record != null
&& migrateAll) {
+ Struct value = (Struct) record.value();
+ Struct source =
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+ if (null == source) {
+ source =
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
+ }
+ String dbName =
source.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+ String collectionName =
+
source.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+ SnapshotRecord snapshotRecord =
SnapshotRecord.fromSource(source);
+ boolean isSnapshotRecord =
(SnapshotRecord.TRUE == snapshotRecord);
+ sourceMetricData
+ .outputMetricsWithEstimate(new
String[]{dbName, collectionName},
+ isSnapshotRecord, value);
+ } else if (sourceMetricData != null && record
!= null) {
sourceMetricData.outputMetricsWithEstimate(record.value());
}
deserializer.deserialize(record, out);
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
index ba6fc0862..be30c827d 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/MongoDBSource.java
@@ -140,6 +140,7 @@ public class MongoDBSource {
private DebeziumDeserializationSchema<T> deserializer;
private String inlongMetric;
private String inlongAudit;
+ private boolean migrateAll;
/** The comma-separated list of hostname and port pairs of mongodb
servers. */
public Builder<T> hosts(String hosts) {
@@ -327,6 +328,11 @@ public class MongoDBSource {
return this;
}
+ public Builder<T> migrateAll(Boolean migrateAll) {
+ this.migrateAll = migrateAll;
+ return this;
+ }
+
/** Build connection uri. */
@VisibleForTesting
public ConnectionString buildConnectionUri() {
@@ -446,7 +452,8 @@ public class MongoDBSource {
Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(),
HEARTBEAT_TOPIC_NAME_DEFAULT);
return new DebeziumSourceFunction<>(
- deserializer, props, null,
Validator.getDefaultValidator(), inlongMetric, inlongAudit);
+ deserializer, props, null, Validator.getDefaultValidator(),
+ inlongMetric, inlongAudit, migrateAll);
}
}
}
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index bdb6e18ed..dc1f71430 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -91,8 +91,6 @@ public class MongoDBConnectorDeserializationSchema
/** TypeInformation of the produced {@link RowData}. */
private final TypeInformation<RowData> resultTypeInfo;
- public static final String DOCUMENT_TO_FIELD = "to";
-
/** Local Time zone. */
private final ZoneId localTimeZone;
@@ -125,8 +123,7 @@ public class MongoDBConnectorDeserializationSchema
ZoneId localTimeZone,
RowKindValidator rowValidator,
boolean sourceMultipleEnable) {
- // this.hasMetadata = checkNotNull(metadataConverters).length > 0;
- this.hasMetadata = true;
+ this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.sourceMultipleEnable = sourceMultipleEnable;
this.appendMetadataCollector = new
AppendMetadataCollector(metadataConverters, sourceMultipleEnable);
this.physicalConverter = createConverter(physicalDataType);
@@ -214,7 +211,8 @@ public class MongoDBConnectorDeserializationSchema
if (!rowKindValidator.validate(MongoRowKind.RENAME)) {
return;
}
- GenericRowData rename = extractMongoDMLData(value,
DOCUMENT_TO_FIELD, OperationType.RENAME.getValue());
+ GenericRowData rename =
+ extractMongoDMLData(value,
RecordUtils.DOCUMENT_TO_FIELD, OperationType.RENAME.getValue());
rename.setRowKind(RowKind.INSERT);
emit(record, rename, out);
break;
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
index 4f23c6a06..fa0def5b4 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/utils/RecordUtils.java
@@ -44,6 +44,7 @@ import org.bson.BsonValue;
*/
public class RecordUtils {
+ public static final String DOCUMENT_TO_FIELD = "to";
private static final List<BsonType> INT_TYPE =
Arrays.asList(BsonType.INT32, BsonType.INT64);
private static final List<BsonType> BOOL_TYPE =
Arrays.asList(BsonType.BOOLEAN);
private static final List<BsonType> DOUBLE_TYPE =
Arrays.asList(BsonType.DOUBLE, BsonType.DECIMAL128);
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
index dd6f319dc..2bddb9ace 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSource.java
@@ -198,6 +198,7 @@ public class MongoDBTableSource implements ScanTableSource,
SupportsReadingMetad
Optional.ofNullable(heartbeatIntervalMillis).ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(inlongMetric).ifPresent(builder::inlongMetric);
Optional.ofNullable(inlongAudit).ifPresent(builder::inlongAudit);
+
Optional.ofNullable(sourceMultipleEnable).ifPresent(builder::migrateAll);
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);