This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9348626926 [INLONG-11357][Sort] Add new source metrics for
sort-connector-sqlserver-cdc-v1.15 (#11358)
9348626926 is described below
commit 9348626926f1354b5b7e994926a9254d32136f77
Author: PeterZh6 <[email protected]>
AuthorDate: Wed Oct 16 11:26:11 2024 +0800
[INLONG-11357][Sort] Add new source metrics for
sort-connector-sqlserver-cdc-v1.15 (#11358)
---
.../sort/sqlserver/DebeziumSourceFunction.java | 70 +++++++++++++++++----
.../RowDataDebeziumDeserializeSchema.java | 71 ++++++++++++++--------
.../inlong/sort/sqlserver/SqlServerSource.java | 11 +++-
.../sort/sqlserver/SqlServerTableSource.java | 1 +
4 files changed, 113 insertions(+), 40 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
index 01118d6513..c480ad1d45 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.sqlserver;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
+
import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -61,6 +64,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -199,17 +204,24 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the
debezium. */
private transient Handover handover;
+ private transient SourceExactlyMetric sourceExactlyMetric;
+
+ private final MetricOption metricOption;
+
+ private transient Map<Long, Long> checkpointStartTimeMap;
+
//
---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
- Validator validator) {
+ Validator validator, MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
+ this.metricOption = metricOption;
}
@Override
@@ -222,6 +234,14 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
+ if (metricOption != null) {
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ ((RowDataDebeziumDeserializeSchema) deserializer)
+ .setSourceExactlyMetric(sourceExactlyMetric);
+ }
+ this.checkpointStartTimeMap = new HashMap<>();
}
// ------------------------------------------------------------------------
@@ -306,17 +326,33 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
- if (handover.hasError()) {
- LOG.debug("snapshotState() called on closed source");
- throw new FlinkRuntimeException(
- "Call snapshotState() on closed source, checkpoint
failed.");
- } else {
- snapshotOffsetState(functionSnapshotContext.getCheckpointId());
- snapshotHistoryRecordsState();
- }
- if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
- ((RowDataDebeziumDeserializeSchema) deserializer)
-
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ try {
+ if (handover.hasError()) {
+ LOG.debug("snapshotState() called on closed source");
+ throw new FlinkRuntimeException(
+ "Call snapshotState() on closed source, checkpoint
failed.");
+ } else {
+ snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+ snapshotHistoryRecordsState();
+ }
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ ((RowDataDebeziumDeserializeSchema) deserializer)
+
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ }
+ if (checkpointStartTimeMap != null) {
+
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(),
System.currentTimeMillis());
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't record the
start time of checkpoint");
+ }
+
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotCreate();
+ }
+ } catch (Exception e) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotCreate();
+ }
+ throw e;
}
}
@@ -498,6 +534,16 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
+ if (checkpointStartTimeMap != null) {
+ Long snapShotStartTimeById =
checkpointStartTimeMap.remove(checkpointId);
+ if (snapShotStartTimeById != null && sourceExactlyMetric !=
null) {
+ sourceExactlyMetric.incNumSnapshotComplete();
+ sourceExactlyMetric.recordSnapshotToCheckpointDelay(
+ System.currentTimeMillis() -
snapShotStartTimeById);
+ }
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't get the start
time of checkpoint");
+ }
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
index d90f470513..394ee0297b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -139,37 +139,49 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
@Override
public void deserialize(SourceRecord record, Collector<RowData> out)
throws Exception {
- Envelope.Operation op = Envelope.operationFor(record);
- Struct value = (Struct) record.value();
- Schema valueSchema = record.valueSchema();
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- GenericRowData insert = extractAfterRow(value, valueSchema);
- validator.validate(insert, RowKind.INSERT);
- insert.setRowKind(RowKind.INSERT);
- if (sourceExactlyMetric != null) {
- out = new MetricsCollector<>(out, sourceExactlyMetric);
+ long deserializeStartTime = System.currentTimeMillis();
+ try {
+ Envelope.Operation op = Envelope.operationFor(record);
+ Struct value = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+ if (op == Envelope.Operation.CREATE || op ==
Envelope.Operation.READ) {
+ GenericRowData insert = extractAfterRow(value, valueSchema);
+ validator.validate(insert, RowKind.INSERT);
+ insert.setRowKind(RowKind.INSERT);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
+ }
+ emit(record, insert, out);
+ } else if (op == Envelope.Operation.DELETE) {
+ GenericRowData delete = extractBeforeRow(value, valueSchema);
+ validator.validate(delete, RowKind.DELETE);
+ delete.setRowKind(RowKind.DELETE);
+ emit(record, delete, out);
+ } else {
+ if (changelogMode == DebeziumChangelogMode.ALL) {
+ GenericRowData before = extractBeforeRow(value,
valueSchema);
+ validator.validate(before, RowKind.UPDATE_BEFORE);
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ emit(record, before, out);
+ }
+
+ GenericRowData after = extractAfterRow(value, valueSchema);
+ validator.validate(after, RowKind.UPDATE_AFTER);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
+ }
+ emit(record, after, out);
}
- emit(record, insert, out);
- } else if (op == Envelope.Operation.DELETE) {
- GenericRowData delete = extractBeforeRow(value, valueSchema);
- validator.validate(delete, RowKind.DELETE);
- delete.setRowKind(RowKind.DELETE);
- emit(record, delete, out);
- } else {
- if (changelogMode == DebeziumChangelogMode.ALL) {
- GenericRowData before = extractBeforeRow(value, valueSchema);
- validator.validate(before, RowKind.UPDATE_BEFORE);
- before.setRowKind(RowKind.UPDATE_BEFORE);
- emit(record, before, out);
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeSuccess();
+
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() -
deserializeStartTime);
}
-
- GenericRowData after = extractAfterRow(value, valueSchema);
- validator.validate(after, RowKind.UPDATE_AFTER);
- after.setRowKind(RowKind.UPDATE_AFTER);
+ } catch (Exception e) {
if (sourceExactlyMetric != null) {
- out = new MetricsCollector<>(out, sourceExactlyMetric);
+ sourceExactlyMetric.incNumDeserializeError();
}
- emit(record, after, out);
+ throw e;
}
}
@@ -697,4 +709,9 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
+
+ /** allow DebeziumSourceFunction to set the SourceExactlyMetric */
+ public void setSourceExactlyMetric(SourceExactlyMetric
sourceExactlyMetric) {
+ this.sourceExactlyMetric = sourceExactlyMetric;
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
index 6a094521a5..92353bf0cf 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerSource.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.sqlserver;
+import org.apache.inlong.sort.base.metric.MetricOption;
+
import com.ververica.cdc.connectors.sqlserver.SqlServerValidator;
import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import io.debezium.connector.sqlserver.SqlServerConnector;
@@ -51,6 +53,7 @@ public class SqlServerSource {
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
+ private MetricOption metricOption;
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
@@ -114,6 +117,12 @@ public class SqlServerSource {
return this;
}
+ /** metricOption used to instantiate SourceExactlyMetric when
inlong.metric.labels is present in flink sql */
+ public Builder<T> metricOption(MetricOption metricOption) {
+ this.metricOption = metricOption;
+ return this;
+ }
+
public DebeziumSourceFunction<T> build() {
Properties props = new Properties();
props.setProperty("connector.class",
SqlServerConnector.class.getCanonicalName());
@@ -154,7 +163,7 @@ public class SqlServerSource {
}
return new DebeziumSourceFunction<>(
- deserializer, props, null, new SqlServerValidator(props));
+ deserializer, props, null, new SqlServerValidator(props),
metricOption);
}
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
index c49dd9747a..87defcedca 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
@@ -144,6 +144,7 @@ public class SqlServerTableSource implements
ScanTableSource, SupportsReadingMet
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.deserializer(deserializer)
+ .metricOption(metricOption)
.build();
return SourceFunctionProvider.of(sourceFunction, false);
}