This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 67676d8631 [INLONG-11355][Sort] Add new source metrics for
sort-connector-mongodb-cdc-v1.15 (#11356)
67676d8631 is described below
commit 67676d863162206c0e52912ec64cc0d18a81e3f3
Author: PeterZh6 <[email protected]>
AuthorDate: Wed Oct 16 17:21:09 2024 +0800
[INLONG-11355][Sort] Add new source metrics for
sort-connector-mongodb-cdc-v1.15 (#11356)
---
.../sort/mongodb/DebeziumSourceFunction.java | 70 ++++++++++---
.../MongoDBConnectorDeserializationSchema.java | 110 ++++++++++++---------
.../apache/inlong/sort/mongodb/MongoDBSource.java | 16 ++-
.../inlong/sort/mongodb/MongoDBTableSource.java | 4 +-
.../sort/mongodb/source/MongoDBSourceBuilder.java | 8 ++
5 files changed, 147 insertions(+), 61 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
index 2d7191525b..46541295e7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.mongodb;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
+
import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -61,6 +64,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -197,17 +202,25 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the
debezium. */
private transient Handover handover;
+ private transient SourceExactlyMetric sourceExactlyMetric;
+
+ private final MetricOption metricOption;
+
+ private transient Map<Long, Long> checkpointStartTimeMap;
+
//
---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
- Validator validator) {
+ Validator validator,
+ MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
+ this.metricOption = metricOption;
}
@Override
@@ -220,6 +233,14 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
+ if (metricOption != null) {
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ this.checkpointStartTimeMap = new HashMap<>();
+ // set sourceExactlyMetric for deserializer
+ if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
+ ((MongoDBConnectorDeserializationSchema)
deserializer).setSourceExactlyMetric(sourceExactlyMetric);
+ }
}
// ------------------------------------------------------------------------
@@ -304,17 +325,32 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
- if (handover.hasError()) {
- LOG.debug("snapshotState() called on closed source");
- throw new FlinkRuntimeException(
- "Call snapshotState() on closed source, checkpoint
failed.");
- } else {
- snapshotOffsetState(functionSnapshotContext.getCheckpointId());
- snapshotHistoryRecordsState();
- }
- if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
- ((MongoDBConnectorDeserializationSchema) deserializer)
-
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ try {
+ if (handover.hasError()) {
+ LOG.debug("snapshotState() called on closed source");
+ throw new FlinkRuntimeException(
+ "Call snapshotState() on closed source, checkpoint
failed.");
+ } else {
+ snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+ snapshotHistoryRecordsState();
+ }
+ if (deserializer instanceof MongoDBConnectorDeserializationSchema)
{
+ ((MongoDBConnectorDeserializationSchema) deserializer)
+
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ }
+ if (checkpointStartTimeMap != null) {
+
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(),
System.currentTimeMillis());
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't record the
start time of checkpoint");
+ }
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotCreate();;
+ }
+ } catch (Exception e) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeError();
+ }
+ throw e;
}
}
@@ -496,6 +532,16 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
+ if (checkpointStartTimeMap != null) {
+ Long snapShotStartTimeById =
checkpointStartTimeMap.remove(checkpointId);
+ if (snapShotStartTimeById != null && sourceExactlyMetric !=
null) {
+ sourceExactlyMetric.incNumSnapshotComplete();
+ sourceExactlyMetric
+
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() -
snapShotStartTimeById);
+ }
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't get the start
time of checkpoint");
+ }
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
index daa8dccb79..668f6de4cb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
@@ -137,53 +137,66 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
@Override
public void deserialize(SourceRecord record, Collector<RowData> out)
throws Exception {
- Struct value = (Struct) record.value();
- Schema valueSchema = record.valueSchema();
-
- OperationType op = operationTypeFor(record);
- BsonDocument documentKey =
- checkNotNull(
- extractBsonDocument(
- value, valueSchema,
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
- BsonDocument fullDocument =
- extractBsonDocument(value, valueSchema,
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
- switch (op) {
- case INSERT:
- GenericRowData insert = extractRowData(fullDocument);
- insert.setRowKind(RowKind.INSERT);
- emit(record, insert,
- sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
- break;
- case DELETE:
- GenericRowData delete = extractRowData(documentKey);
- delete.setRowKind(RowKind.DELETE);
- emit(record, delete,
- sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
- break;
- case UPDATE:
- // It’s null if another operation deletes the document
- // before the lookup operation happens. Ignored it.
- if (fullDocument == null) {
+ long deserializeStartTime = System.currentTimeMillis();
+ try {
+ Struct value = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+
+ OperationType op = operationTypeFor(record);
+ BsonDocument documentKey =
+ checkNotNull(
+ extractBsonDocument(
+ value, valueSchema,
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
+ BsonDocument fullDocument =
+ extractBsonDocument(value, valueSchema,
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
+ switch (op) {
+ case INSERT:
+ GenericRowData insert = extractRowData(fullDocument);
+ insert.setRowKind(RowKind.INSERT);
+ emit(record, insert,
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
break;
- }
- GenericRowData updateAfter = extractRowData(fullDocument);
- updateAfter.setRowKind(RowKind.UPDATE_AFTER);
- emit(record, updateAfter,
- sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
- break;
- case REPLACE:
- GenericRowData replaceAfter = extractRowData(fullDocument);
- replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
- emit(record, replaceAfter,
- sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
- break;
- case INVALIDATE:
- case DROP:
- case DROP_DATABASE:
- case RENAME:
- case OTHER:
- default:
- break;
+ case DELETE:
+ GenericRowData delete = extractRowData(documentKey);
+ delete.setRowKind(RowKind.DELETE);
+ emit(record, delete,
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
+ break;
+ case UPDATE:
+ // It’s null if another operation deletes the document
+ // before the lookup operation happens. Ignored it.
+ if (fullDocument == null) {
+ break;
+ }
+ GenericRowData updateAfter = extractRowData(fullDocument);
+ updateAfter.setRowKind(RowKind.UPDATE_AFTER);
+ emit(record, updateAfter,
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
+ break;
+ case REPLACE:
+ GenericRowData replaceAfter = extractRowData(fullDocument);
+ replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
+ emit(record, replaceAfter,
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
+ break;
+ case INVALIDATE:
+ case DROP:
+ case DROP_DATABASE:
+ case RENAME:
+ case OTHER:
+ default:
+ break;
+ }
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeSuccess();
+
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() -
deserializeStartTime);
+ }
+
+ } catch (Exception e) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeError();
+ }
+ throw e;
}
}
@@ -827,4 +840,9 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
+
+ /** setter for DebeziumSourceFunction to set SourceExactlyMetric*/
+ public void setSourceExactlyMetric(SourceExactlyMetric
sourceExactlyMetric) {
+ this.sourceExactlyMetric = sourceExactlyMetric;
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
index f9aab2d54f..67bb51bf69 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.mongodb;
+import org.apache.inlong.sort.base.metric.MetricOption;
+
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
@@ -35,7 +37,11 @@ import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSour
import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
-import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*;
+import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
+import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static
com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -76,6 +82,7 @@ public class MongoDBSource {
private String copyExistingPipeline;
private Integer heartbeatIntervalMillis =
HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;
+ private MetricOption metricOption;
/** The comma-separated list of hostname and port pairs of mongodb
servers. */
public Builder<T> hosts(String hosts) {
@@ -243,6 +250,11 @@ public class MongoDBSource {
return this;
}
+ public Builder<T> metricOption(MetricOption metricOption) {
+ this.metricOption = metricOption;
+ return this;
+ }
+
/**
* The properties of mongodb kafka connector.
* https://docs.mongodb.com/kafka-connector/current/kafka-source
@@ -338,7 +350,7 @@ public class MongoDBSource {
MongoSourceConfig.ERRORS_TOLERANCE_CONFIG,
ErrorTolerance.NONE.value());
return new DebeziumSourceFunction<>(
- deserializer, props, null,
Validator.getDefaultValidator());
+ deserializer, props, null,
Validator.getDefaultValidator(), metricOption);
}
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
index 9c417b4edf..a161077b0a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
@@ -191,13 +191,15 @@ public class MongoDBTableSource implements
ScanTableSource, SupportsReadingMetad
.ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
+ Optional.ofNullable(metricOption).ifPresent(builder::metricOption);
return SourceProvider.of(builder.build());
} else {
org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData>
builder =
org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder()
.hosts(hosts)
- .deserializer(deserializer);
+ .deserializer(deserializer)
+ .metricOption(metricOption);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
index a95f238a0b..07b0d9f1cf 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.mongodb.source;
+import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
import com.ververica.cdc.connectors.base.options.StartupOptions;
@@ -54,6 +55,7 @@ public class MongoDBSourceBuilder<T> {
private final MongoDBSourceConfigFactory configFactory = new
MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;
+ private MetricOption metricOption;
/** The comma-separated list of hostname and port pairs of mongodb
servers. */
public MongoDBSourceBuilder<T> hosts(String hosts) {
@@ -189,6 +191,12 @@ public class MongoDBSourceBuilder<T> {
return this;
}
+ /** The metric option used to collect metrics when inlong.metric.labels is
present in flink sql. */
+ public MongoDBSourceBuilder<T> metricOption(MetricOption metricOption) {
+ this.metricOption = metricOption;
+ return this;
+ }
+
/**
* Build the {@link MongoDBSource}.
*