This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1c21152468 [INLONG-11340][Sort] Add new source metrics for
sort-connector-pulsar-v1.15 (#11341)
1c21152468 is described below
commit 1c21152468b61f34d06482917c0ec6518c0291f6
Author: PeterZh6 <[email protected]>
AuthorDate: Mon Oct 14 16:59:36 2024 +0800
[INLONG-11340][Sort] Add new source metrics for sort-connector-pulsar-v1.15
(#11341)
---
.../sort/base/metric/SourceExactlyMetric.java | 6 ++
.../source/reader/PulsarOrderedSourceReader.java | 69 +++++++++++++++++-----
.../source/reader/PulsarUnorderedSourceReader.java | 69 ++++++++++++++++------
.../table/PulsarTableDeserializationSchema.java | 63 +++++++++++++-------
4 files changed, 153 insertions(+), 54 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
index a0b71c554c..a5252402c8 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceExactlyMetric.java
@@ -372,6 +372,12 @@ public class SourceExactlyMetric implements MetricData,
Serializable, SourceMetr
}
}
+ public void decNumDeserializeSuccess() {
+ if (numDeserializeSuccess != null) {
+ numDeserializeSuccess.dec();
+ }
+ }
+
public void incNumDeserializeError() {
if (numDeserializeError != null) {
numDeserializeError.inc();
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
index f535082ea2..3c75793f93 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarOrderedSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
import org.apache.flink.annotation.Internal;
@@ -70,6 +71,10 @@ public class PulsarOrderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT>
private final AtomicReference<Throwable> cursorCommitThrowable = new
AtomicReference<>();
private final PulsarDeserializationSchema<OUT> deserializationSchema;
private ScheduledExecutorService cursorScheduler;
+ private SourceExactlyMetric sourceExactlyMetric;
+
+ /** The map to store the start time of each checkpoint. */
+ private transient Map<Long, Long> checkpointStartTimeMap;
public PulsarOrderedSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>>
elementsQueue,
@@ -90,6 +95,12 @@ public class PulsarOrderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT>
this.cursorsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.cursorsOfFinishedSplits = new ConcurrentHashMap<>();
this.deserializationSchema = deserializationSchema;
+ // get SourceExactlyMetric instance from deserializationSchema
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema)
{
+ this.sourceExactlyMetric =
+ ((PulsarTableDeserializationSchema)
deserializationSchema).getSourceExactlyMetric();
+ }
+ this.checkpointStartTimeMap = new HashMap<>();
}
@Override
@@ -131,25 +142,40 @@ public class PulsarOrderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT>
@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
- if (deserializationSchema instanceof PulsarTableDeserializationSchema)
{
- ((PulsarTableDeserializationSchema)
deserializationSchema).updateCurrentCheckpointId(checkpointId);
- }
- List<PulsarPartitionSplit> splits = super.snapshotState(checkpointId);
+ try {
+ // record the start time of each checkpoint
+ if (checkpointStartTimeMap != null) {
+ checkpointStartTimeMap.put(checkpointId,
System.currentTimeMillis());
+ }
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotCreate();
+ }
- // Perform a snapshot for these splits.
- Map<TopicPartition, MessageId> cursors =
- cursorsToCommit.computeIfAbsent(checkpointId, id -> new
HashMap<>());
- // Put the cursors of the active splits.
- for (PulsarPartitionSplit split : splits) {
- MessageId latestConsumedId = split.getLatestConsumedId();
- if (latestConsumedId != null) {
- cursors.put(split.getPartition(), latestConsumedId);
+ if (deserializationSchema instanceof
PulsarTableDeserializationSchema) {
+ ((PulsarTableDeserializationSchema)
deserializationSchema).updateCurrentCheckpointId(checkpointId);
}
- }
- // Put cursors of all the finished splits.
- cursors.putAll(cursorsOfFinishedSplits);
+ List<PulsarPartitionSplit> splits =
super.snapshotState(checkpointId);
+
+ // Perform a snapshot for these splits.
+ Map<TopicPartition, MessageId> cursors =
+ cursorsToCommit.computeIfAbsent(checkpointId, id -> new
HashMap<>());
+ // Put the cursors of the active splits.
+ for (PulsarPartitionSplit split : splits) {
+ MessageId latestConsumedId = split.getLatestConsumedId();
+ if (latestConsumedId != null) {
+ cursors.put(split.getPartition(), latestConsumedId);
+ }
+ }
+ // Put cursors of all the finished splits.
+ cursors.putAll(cursorsOfFinishedSplits);
- return splits;
+ return splits;
+ } catch (Exception e) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotError();
+ }
+ throw e;
+ }
}
@Override
@@ -170,6 +196,17 @@ public class PulsarOrderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT>
pulsarTableDeserializationSchema.flushAudit();
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
}
+ // get the start time of the currently completed checkpoint
+ if (checkpointStartTimeMap != null) {
+ Long snapShotStartTimeById =
checkpointStartTimeMap.remove(checkpointId);
+ if (snapShotStartTimeById != null && sourceExactlyMetric !=
null) {
+ sourceExactlyMetric.incNumSnapshotComplete();
+ sourceExactlyMetric
+
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() -
snapShotStartTimeById);
+ }
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't get the start
time of checkpoint");
+ }
} catch (Exception e) {
LOG.error("Failed to acknowledge cursors for checkpoint {}",
checkpointId, e);
cursorCommitThrowable.compareAndSet(null, e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
index 2ccf74fe3a..adf15de0b1 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/source/reader/PulsarUnorderedSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.pulsar.source.reader;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.pulsar.table.PulsarTableDeserializationSchema;
import org.apache.flink.annotation.Internal;
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@@ -66,6 +68,11 @@ public class PulsarUnorderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT
private final PulsarDeserializationSchema<OUT> deserializationSchema;
private boolean started = false;
+ private SourceExactlyMetric sourceExactlyMetric;
+
+ /** The map to store the start time of each checkpoint. */
+ private transient Map<Long, Long> checkpointStartTimeMap;
+
public PulsarUnorderedSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>>
elementsQueue,
Supplier<PulsarUnorderedPartitionSplitReader<OUT>>
splitReaderSupplier,
@@ -86,6 +93,11 @@ public class PulsarUnorderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT
this.transactionsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
this.transactionsOfFinishedSplits = Collections.synchronizedList(new
ArrayList<>());
this.deserializationSchema = deserializationSchema;
+ if (deserializationSchema instanceof PulsarTableDeserializationSchema)
{
+ this.sourceExactlyMetric =
+ ((PulsarTableDeserializationSchema)
deserializationSchema).getSourceExactlyMetric();
+ }
+ this.checkpointStartTimeMap = new HashMap<>();
}
@Override
@@ -141,26 +153,38 @@ public class PulsarUnorderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT
@Override
public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
- LOG.debug("Trigger the new transaction for downstream readers.");
- if (deserializationSchema instanceof PulsarTableDeserializationSchema)
{
- ((PulsarTableDeserializationSchema)
deserializationSchema).updateCurrentCheckpointId(checkpointId);
- }
- List<PulsarPartitionSplit> splits =
- ((PulsarUnorderedFetcherManager<OUT>)
splitFetcherManager).snapshotState();
+ try {
+ // record the start time of each checkpoint
+ if (checkpointStartTimeMap != null) {
+ checkpointStartTimeMap.put(checkpointId,
System.currentTimeMillis());
+ }
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotCreate();
+ }
+ LOG.debug("Trigger the new transaction for downstream readers.");
+ if (deserializationSchema instanceof
PulsarTableDeserializationSchema) {
+ ((PulsarTableDeserializationSchema)
deserializationSchema).updateCurrentCheckpointId(checkpointId);
+ }
+ List<PulsarPartitionSplit> splits =
+ ((PulsarUnorderedFetcherManager<OUT>)
splitFetcherManager).snapshotState();
- if (coordinatorClient == null) {
- return splits;
- }
- // Snapshot the transaction status and commit it after checkpoint
finishing.
- List<TxnID> txnIDs =
- transactionsToCommit.computeIfAbsent(checkpointId, id -> new
ArrayList<>());
- for (PulsarPartitionSplit split : splits) {
- TxnID uncommittedTransactionId =
split.getUncommittedTransactionId();
- if (uncommittedTransactionId != null) {
- txnIDs.add(uncommittedTransactionId);
+ if (coordinatorClient == null) {
+ return splits;
+ }
+ // Snapshot the transaction status and commit it after checkpoint
finishing.
+ List<TxnID> txnIDs =
+ transactionsToCommit.computeIfAbsent(checkpointId, id ->
new ArrayList<>());
+ for (PulsarPartitionSplit split : splits) {
+ TxnID uncommittedTransactionId =
split.getUncommittedTransactionId();
+ if (uncommittedTransactionId != null) {
+ txnIDs.add(uncommittedTransactionId);
+ }
}
+ return splits;
+ } catch (Exception e) {
+ sourceExactlyMetric.incNumSnapshotError();
+ throw e;
}
- return splits;
}
@Override
@@ -188,5 +212,16 @@ public class PulsarUnorderedSourceReader<OUT> extends
PulsarSourceReaderBase<OUT
pulsarTableDeserializationSchema.flushAudit();
pulsarTableDeserializationSchema.updateLastCheckpointId(checkpointId);
}
+ // get the start time of the currently completed checkpoint
+ if (checkpointStartTimeMap != null) {
+ Long snapShotStartTimeById =
checkpointStartTimeMap.remove(checkpointId);
+ if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumSnapshotComplete();
+ sourceExactlyMetric
+
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() -
snapShotStartTimeById);
+ }
+ } else {
+ LOG.error("checkpointStartTimeMap is null, can't get the start
time of checkpoint");
+ }
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
index c05f485af6..5fef1d95c5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java
@@ -86,7 +86,7 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
keyDeserialization.open(context);
}
if (metricOption != null) {
- sourceExactlyMetric = new SourceExactlyMetric(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption,
context.getMetricGroup());
}
valueDeserialization.open(context);
}
@@ -94,27 +94,43 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
@Override
public void deserialize(Message<byte[]> message, Collector<RowData>
collector)
throws IOException {
- // Get the key row data
- List<RowData> keyRowData = new ArrayList<>();
- if (keyDeserialization != null) {
- keyDeserialization.deserialize(message.getKeyBytes(), new
ListCollector<>(keyRowData));
+ try {
+ long deserializeStartTime = System.currentTimeMillis();
+ // increase the number of deserialize success first, if
deserialize failed, decrease it
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeSuccess();
+ }
+ // Get the key row data
+ List<RowData> keyRowData = new ArrayList<>();
+ if (keyDeserialization != null) {
+ keyDeserialization.deserialize(message.getKeyBytes(), new
ListCollector<>(keyRowData));
+ }
+
+ // Get the value row data
+ List<RowData> valueRowData = new ArrayList<>();
+
+ if (upsertMode && message.getData().length == 0) {
+ rowDataConverter.projectToRowWithNullValueRow(message,
keyRowData, collector);
+ return;
+ }
+
+ MetricsCollector<RowData> metricsCollector =
+ new MetricsCollector<>(collector, sourceExactlyMetric);
+
+ valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
+
+ rowDataConverter.projectToProducedRowAndCollect(
+ message, keyRowData, valueRowData, metricsCollector);
+ if (sourceExactlyMetric != null) {
+
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() -
deserializeStartTime);
+ }
+ } catch (Exception e) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.incNumDeserializeError();
+ sourceExactlyMetric.decNumDeserializeSuccess();
+ }
+ throw e;
}
-
- // Get the value row data
- List<RowData> valueRowData = new ArrayList<>();
-
- if (upsertMode && message.getData().length == 0) {
- rowDataConverter.projectToRowWithNullValueRow(message, keyRowData,
collector);
- return;
- }
-
- MetricsCollector<RowData> metricsCollector =
- new MetricsCollector<>(collector, sourceExactlyMetric);
-
- valueDeserialization.deserialize(message.getData(), new
ListCollector<>(valueRowData));
-
- rowDataConverter.projectToProducedRowAndCollect(
- message, keyRowData, valueRowData, metricsCollector);
}
@Override
@@ -139,4 +155,9 @@ public class PulsarTableDeserializationSchema implements
PulsarDeserializationSc
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
+
+ /** getter for PulsarSourceReader to record metrics */
+ public SourceExactlyMetric getSourceExactlyMetric() {
+ return sourceExactlyMetric;
+ }
}
\ No newline at end of file