This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 727475ff8 [INLONG-7595][Sort] Mongo read phase metrics need to update
when no incremental data (#7606)
727475ff8 is described below
commit 727475ff8a36f4b04b3eff694ddf9d6dc74f2c49
Author: Xin Gong <[email protected]>
AuthorDate: Wed Mar 15 14:32:27 2023 +0800
[INLONG-7595][Sort] Mongo read phase metrics need to update when no
incremental data (#7606)
---
.../sort/cdc/mongodb/DebeziumSourceFunction.java | 32 ++++++++++++++++------
.../debezium/DebeziumDeserializationSchema.java | 8 +++++-
.../debezium/internal/DebeziumChangeFetcher.java | 23 +++++++++-------
.../MongoDBConnectorDeserializationSchema.java | 5 ++++
4 files changed, 49 insertions(+), 19 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 1a763a16c..b835b584c 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.cdc.mongodb;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.debezium.Validator;
-import io.debezium.connector.SnapshotRecord;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.embedded.Connect;
@@ -50,6 +52,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.enums.ReadPhase;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
@@ -501,17 +504,30 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
+ // do nothing
+ }
+
+ @Override
+ public void deserialize(SourceRecord record,
Collector<T> out, Boolean isStreamingPhase)
+ throws Exception {
+ if (record != null &&
MongoRecordUtils.isHeartbeatEvent(record)) {
+ if (sourceMetricData != null &&
isStreamingPhase) {
+
sourceMetricData.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
+ }
+ return;
+ }
if (sourceMetricData != null && record != null
&& migrateAll) {
Struct value = (Struct) record.value();
- Struct source =
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
- if (null == source) {
- source =
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
+ Struct ns =
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+ if (null == ns) {
+ ns =
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
}
- String dbName =
source.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+ String dbName =
ns.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
String collectionName =
-
source.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
- SnapshotRecord snapshotRecord =
SnapshotRecord.fromSource(source);
- boolean isSnapshotRecord =
(SnapshotRecord.TRUE == snapshotRecord);
+
ns.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+ Struct source =
value.getStruct(Envelope.FieldName.SOURCE);
+ String snapshotRecord =
source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
+ boolean isSnapshotRecord =
Boolean.parseBoolean(snapshotRecord);
sourceMetricData
.outputMetricsWithEstimate(new
String[]{dbName, collectionName},
isSnapshotRecord, value);
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
index be0b8a527..e60d65640 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
@@ -17,12 +17,13 @@
package org.apache.inlong.sort.cdc.mongodb.debezium;
-import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;
+import java.io.Serializable;
+
/**
* The deserialization schema describes how to turn the Debezium SourceRecord
into data types
* (Java/Scala objects) that are processed by Flink.
@@ -37,4 +38,9 @@ public interface DebeziumDeserializationSchema<T> extends
Serializable, ResultTy
*/
void deserialize(SourceRecord record, Collector<T> out) throws Exception;
+ /**
+ * Deserialize the Debezium record, it is represented in Kafka {@link
SourceRecord}.
+ */
+ void deserialize(SourceRecord record, Collector<T> out, Boolean
isStreamingPhase) throws Exception;
+
}
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
index 3ffed20ec..3bbefa064 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
@@ -21,10 +21,6 @@ import io.debezium.connector.SnapshotRecord;
import io.debezium.data.Envelope;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -36,6 +32,11 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
/**
* A Handler that convert change messages from {@link DebeziumEngine} to data
in Flink. Considering
* Debezium in different mode has different strategies to hold the lock, e.g.
snapshot, the handler
@@ -150,9 +151,9 @@ public class DebeziumChangeFetcher<T> {
synchronized (checkpointLock) {
LOG.info(
"Database snapshot phase can't perform checkpoint,
acquired Checkpoint lock.");
- handleBatch(events);
+ handleBatch(events, false);
while (isRunning && isInDbSnapshotPhase) {
- handleBatch(handover.pollNext());
+ handleBatch(handover.pollNext(), false);
}
}
LOG.info("Received record from streaming binlog phase,
released checkpoint lock.");
@@ -162,7 +163,7 @@ public class DebeziumChangeFetcher<T> {
while (isRunning) {
// If the handover is closed or has errors, exit.
// If there is no streaming phase, the handover will be closed
by the engine.
- handleBatch(handover.pollNext());
+ handleBatch(handover.pollNext(), true);
}
} catch (Handover.ClosedException e) {
// ignore
@@ -206,7 +207,7 @@ public class DebeziumChangeFetcher<T> {
// Helper
//
---------------------------------------------------------------------------------------
- private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>>
changeEvents)
+ private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>>
changeEvents, boolean isStreamingPhase)
throws Exception {
if (CollectionUtils.isEmpty(changeEvents)) {
return;
@@ -224,11 +225,13 @@ public class DebeziumChangeFetcher<T> {
debeziumOffset.setSourcePartition(record.sourcePartition());
debeziumOffset.setSourceOffset(record.sourceOffset());
}
+ }
+ deserialization.deserialize(record, debeziumCollector,
isStreamingPhase);
+
+ if (isHeartbeatEvent(record)) {
// drop heartbeat events
continue;
}
- deserialization.deserialize(record, debeziumCollector);
-
if (!isSnapshotRecord(record)) {
LOG.debug("Snapshot phase finishes.");
isInDbSnapshotPhase = false;
diff --git
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index dc1f71430..3ef453545 100644
---
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -222,6 +222,11 @@ public class MongoDBConnectorDeserializationSchema
}
}
+ @Override
+ public void deserialize(SourceRecord record, Collector<RowData> out,
Boolean isStreamingPhase) throws Exception {
+ this.deserialize(record, out);
+ }
+
private GenericRowData extractMongoDMLData(Struct value, String keyFiled,
String ddlType) {
Struct documentTo = (Struct) value.get(keyFiled);
String newDb =
documentTo.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);