This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c21152468 [INLONG-11340][Sort] Add new source metrics for 
sort-connector-pulsar-v1.15 (#11341)
1c21152468 is described below

commit 1c21152468b61f34d06482917c0ec6518c0291f6
Author: PeterZh6 <[email protected]>
AuthorDate: Mon Oct 14 16:59:36 2024 +0800

    [INLONG-11340][Sort] Add new source metrics for sort-connector-pulsar-v1.15 
(#11341)
---
 .../sort/base/metric/SourceExactlyMetric.java      |  6 ++
 .../source/reader/PulsarOrderedSourceReader.java   | 69 +++++++++++++++++-----
 .../source/reader/PulsarUnorderedSourceReader.java | 69 ++++++++++++++++------
 .../table/PulsarTableDeserializationSchema.java    | 63 +++++++++++++-------
 4 files changed, 153 insertions(+), 54 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
index a0b71c554c..a5252402c8 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
@@ -372,6 +372,12 @@ public class SourceExactlyMetric implements MetricData, 
Serializable, SourceMetr
         }
     }
 
+    public void decNumDeserializeSuccess() {
+        if (numDeserializeSuccess != null) {
+            numDeserializeSuccess.dec();
+        }
+    }
+
     public void incNumDeserializeError() {
         if (numDeserializeError != null) {
             numDeserializeError.inc();
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
index f535082ea2..3c75793f93 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.pulsar.source.reader;
 
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
 
 import org.apache.flink.annotation.Internal;
@@ -70,6 +71,10 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
     private final AtomicReference<Throwable> cursorCommitThrowable = new 
AtomicReference<>();
     private final PulsarDeserializationSchema<OUT> deserializationSchema;
     private ScheduledExecutorService cursorScheduler;
+    private SourceExactlyMetric sourceExactlyMetric;
+
+    /** The map to store the start time of each checkpoint. */
+    private transient Map<Long, Long> checkpointStartTimeMap;
 
     public PulsarOrderedSourceReader(
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
@@ -90,6 +95,12 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
         this.cursorsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
         this.deserializationSchema = deserializationSchema;
+        // get SourceExactlyMetric instance from deserializationSchema
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
{
+            this.sourceExactlyMetric =
+                    ((PulsarTableDeserializationSchema) 
deserializationSchema).getSourceExactlyMetric();
+        }
+        this.checkpointStartTimeMap = new HashMap<>();
     }
 
     @Override
@@ -131,25 +142,40 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
 
     @Override
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
-        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
{
-            ((PulsarTableDeserializationSchema) 
deserializationSchema).updateCurrentCheckpointId(checkpointId);
-        }
-        List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
+        try {
+            // record the start time of each checkpoint
+            if (checkpointStartTimeMap != null) {
+                checkpointStartTimeMap.put(checkpointId, 
System.currentTimeMillis());
+            }
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
 
-        // Perform a snapshot for these splits.
-        Map<TopicPartition, MessageId> cursors =
-                cursorsToCommit.computeIfAbsent(checkpointId, id -> new 
HashMap<>());
-        // Put the cursors of the active splits.
-        for (PulsarPartitionSplit split : splits) {
-            MessageId latestConsumedId = split.getLatestConsumedId();
-            if (latestConsumedId != null) {
-                cursors.put(split.getPartition(), latestConsumedId);
+            if (deserializationSchema instanceof 
PulsarTableDeserializationSchema) {
+                ((PulsarTableDeserializationSchema) 
deserializationSchema).updateCurrentCheckpointId(checkpointId);
             }
-        }
-        // Put cursors of all the finished splits.
-        cursors.putAll(cursorsOfFinishedSplits);
+            List<PulsarPartitionSplit> splits = 
super.snapshotState(checkpointId);
+
+            // Perform a snapshot for these splits.
+            Map<TopicPartition, MessageId> cursors =
+                    cursorsToCommit.computeIfAbsent(checkpointId, id -> new 
HashMap<>());
+            // Put the cursors of the active splits.
+            for (PulsarPartitionSplit split : splits) {
+                MessageId latestConsumedId = split.getLatestConsumedId();
+                if (latestConsumedId != null) {
+                    cursors.put(split.getPartition(), latestConsumedId);
+                }
+            }
+            // Put cursors of all the finished splits.
+            cursors.putAll(cursorsOfFinishedSplits);
 
-        return splits;
+            return splits;
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotError();
+            }
+            throw e;
+        }
     }
 
     @Override
@@ -170,6 +196,17 @@ public class PulsarOrderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT>
                 pulsarTableDeserializationSchema.flushAudit();
                 
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
             }
+            // get the start time of the currently completed checkpoint
+            if (checkpointStartTimeMap != null) {
+                Long snapShotStartTimeById = 
checkpointStartTimeMap.remove(checkpointId);
+                if (snapShotStartTimeById != null && sourceExactlyMetric != 
null) {
+                    sourceExactlyMetric.incNumSnapshotComplete();
+                    sourceExactlyMetric
+                            
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - 
snapShotStartTimeById);
+                }
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't get the start 
time of checkpoint");
+            }
         } catch (Exception e) {
             LOG.error("Failed to acknowledge cursors for checkpoint {}", 
checkpointId, e);
             cursorCommitThrowable.compareAndSet(null, e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
index 2ccf74fe3a..adf15de0b1 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.pulsar.source.reader;
 
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
 
 import org.apache.flink.annotation.Internal;
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
@@ -66,6 +68,11 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     private final PulsarDeserializationSchema<OUT> deserializationSchema;
     private boolean started = false;
 
+    private SourceExactlyMetric sourceExactlyMetric;
+
+    /** The map to store the start time of each checkpoint. */
+    private transient Map<Long, Long> checkpointStartTimeMap;
+
     public PulsarUnorderedSourceReader(
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> 
elementsQueue,
             Supplier<PulsarUnorderedPartitionSplitReader<OUT>> 
splitReaderSupplier,
@@ -86,6 +93,11 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
         this.transactionsToCommit = Collections.synchronizedSortedMap(new 
TreeMap<>());
         this.transactionsOfFinishedSplits = Collections.synchronizedList(new 
ArrayList<>());
         this.deserializationSchema = deserializationSchema;
+        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
{
+            this.sourceExactlyMetric =
+                    ((PulsarTableDeserializationSchema) 
deserializationSchema).getSourceExactlyMetric();
+        }
+        this.checkpointStartTimeMap = new HashMap<>();
     }
 
     @Override
@@ -141,26 +153,38 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
 
     @Override
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
-        LOG.debug("Trigger the new transaction for downstream readers.");
-        if (deserializationSchema instanceof PulsarTableDeserializationSchema) 
{
-            ((PulsarTableDeserializationSchema) 
deserializationSchema).updateCurrentCheckpointId(checkpointId);
-        }
-        List<PulsarPartitionSplit> splits =
-                ((PulsarUnorderedFetcherManager<OUT>) 
splitFetcherManager).snapshotState();
+        try {
+            // record the start time of each checkpoint
+            if (checkpointStartTimeMap != null) {
+                checkpointStartTimeMap.put(checkpointId, 
System.currentTimeMillis());
+            }
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();
+            }
+            LOG.debug("Trigger the new transaction for downstream readers.");
+            if (deserializationSchema instanceof 
PulsarTableDeserializationSchema) {
+                ((PulsarTableDeserializationSchema) 
deserializationSchema).updateCurrentCheckpointId(checkpointId);
+            }
+            List<PulsarPartitionSplit> splits =
+                    ((PulsarUnorderedFetcherManager<OUT>) 
splitFetcherManager).snapshotState();
 
-        if (coordinatorClient == null) {
-            return splits;
-        }
-        // Snapshot the transaction status and commit it after checkpoint 
finishing.
-        List<TxnID> txnIDs =
-                transactionsToCommit.computeIfAbsent(checkpointId, id -> new 
ArrayList<>());
-        for (PulsarPartitionSplit split : splits) {
-            TxnID uncommittedTransactionId = 
split.getUncommittedTransactionId();
-            if (uncommittedTransactionId != null) {
-                txnIDs.add(uncommittedTransactionId);
+            if (coordinatorClient == null) {
+                return splits;
+            }
+            // Snapshot the transaction status and commit it after checkpoint 
finishing.
+            List<TxnID> txnIDs =
+                    transactionsToCommit.computeIfAbsent(checkpointId, id -> 
new ArrayList<>());
+            for (PulsarPartitionSplit split : splits) {
+                TxnID uncommittedTransactionId = 
split.getUncommittedTransactionId();
+                if (uncommittedTransactionId != null) {
+                    txnIDs.add(uncommittedTransactionId);
+                }
             }
+            return splits;
+        } catch (Exception e) {
+            sourceExactlyMetric.incNumSnapshotError();
+            throw e;
         }
-        return splits;
     }
 
     @Override
@@ -188,5 +212,16 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
             pulsarTableDeserializationSchema.flushAudit();
             
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
         }
+        // get the start time of the currently completed checkpoint
+        if (checkpointStartTimeMap != null) {
+            Long snapShotStartTimeById = 
checkpointStartTimeMap.remove(checkpointId);
+            if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotComplete();
+                sourceExactlyMetric
+                        
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - 
snapShotStartTimeById);
+            }
+        } else {
+            LOG.error("checkpointStartTimeMap is null, can't get the start 
time of checkpoint");
+        }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index c05f485af6..5fef1d95c5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -86,7 +86,7 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             keyDeserialization.open(context);
         }
         if (metricOption != null) {
-            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption, 
context.getMetricGroup());
         }
         valueDeserialization.open(context);
     }
@@ -94,27 +94,43 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
     @Override
     public void deserialize(Message<byte[]> message, Collector<RowData> 
collector)
             throws IOException {
-        // Get the key row data
-        List<RowData> keyRowData = new ArrayList<>();
-        if (keyDeserialization != null) {
-            keyDeserialization.deserialize(message.getKeyBytes(), new 
ListCollector<>(keyRowData));
+        try {
+            long deserializeStartTime = System.currentTimeMillis();
+            // increase the number of deserialize success first, if 
deserialize failed, decrease it
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeSuccess();
+            }
+            // Get the key row data
+            List<RowData> keyRowData = new ArrayList<>();
+            if (keyDeserialization != null) {
+                keyDeserialization.deserialize(message.getKeyBytes(), new 
ListCollector<>(keyRowData));
+            }
+
+            // Get the value row data
+            List<RowData> valueRowData = new ArrayList<>();
+
+            if (upsertMode && message.getData().length == 0) {
+                rowDataConverter.projectToRowWithNullValueRow(message, 
keyRowData, collector);
+                return;
+            }
+
+            MetricsCollector<RowData> metricsCollector =
+                    new MetricsCollector<>(collector, sourceExactlyMetric);
+
+            valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
+
+            rowDataConverter.projectToProducedRowAndCollect(
+                    message, keyRowData, valueRowData, metricsCollector);
+            if (sourceExactlyMetric != null) {
+                
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - 
deserializeStartTime);
+            }
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeError();
+                sourceExactlyMetric.decNumDeserializeSuccess();
+            }
+            throw e;
         }
-
-        // Get the value row data
-        List<RowData> valueRowData = new ArrayList<>();
-
-        if (upsertMode && message.getData().length == 0) {
-            rowDataConverter.projectToRowWithNullValueRow(message, keyRowData, 
collector);
-            return;
-        }
-
-        MetricsCollector<RowData> metricsCollector =
-                new MetricsCollector<>(collector, sourceExactlyMetric);
-
-        valueDeserialization.deserialize(message.getData(), new 
ListCollector<>(valueRowData));
-
-        rowDataConverter.projectToProducedRowAndCollect(
-                message, keyRowData, valueRowData, metricsCollector);
     }
 
     @Override
@@ -139,4 +155,9 @@ public class PulsarTableDeserializationSchema implements 
PulsarDeserializationSc
             sourceExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
+
+    /** getter for PulsarSourceReader to record metrics */
+    public SourceExactlyMetric getSourceExactlyMetric() {
+        return sourceExactlyMetric;
+    }
 }
\ No newline at end of file

Reply via email to