This is an automated email from the ASF dual-hosted git repository. tchoi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 4bb08099d91 HIVE-29188: [hiveACIDReplication] Add src and tgt commit time in replication metrics for better monitoring (#6071) 4bb08099d91 is described below commit 4bb08099d91acbefee73a449a36abb1ecd2b5925 Author: Shivam Kumar <67912385+shivao...@users.noreply.github.com> AuthorDate: Tue Sep 16 08:15:05 2025 +0530 HIVE-29188: [hiveACIDReplication] Add src and tgt commit time in replication metrics for better monitoring (#6071) * Details: * Currently when the Hive ACID replication is running we do not have any idea of how the replication is progressing. * Even when the replication is not running, we do not have any idea how much the target is behind the src. * This commit is to add this info in replication_metrics. Co-authored-by: shivam02 <shivam.ku...@cloudera.com> --- .../ql/parse/repl/load/message/AbortTxnHandler.java | 14 ++++++++++++++ .../ql/parse/repl/load/message/CommitTxnHandler.java | 14 ++++++++++++++ .../parse/repl/metric/ReplicationMetricCollector.java | 16 ++++++++++++++++ .../hadoop/hive/ql/parse/repl/metric/event/Stage.java | 19 +++++++++++++++++++ .../llap/replication_metrics_ingest.q.out | 4 ++-- 5 files changed, 65 insertions(+), 2 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java index c92ef253de8..6cd3965757a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message; import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -27,6 +28,8 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; /** * AbortTxnHandler @@ -43,6 +46,17 @@ public List<Task<?>> handle(Context context) AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload()); + // Saving the timestamp of all write abort txn in metric 'progress' to calculate lag between src and tgt + List<Long> writeIds = msg.getWriteIds(); + List<String> databases = Optional.ofNullable(msg.getDbsUpdated()) + .orElse(Collections.emptyList()) + .stream() + .map(StringUtils::normalizeIdentifier) + .toList(); + if (databases.contains(context.dbName) && writeIds != null && !writeIds.isEmpty()) { + context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp()); + } + Task<ReplTxnWork> abortTxnTask = TaskFactory.get( new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, null, msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java index 2224793a059..65329a8f0d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.WriteEventInfo; import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; import org.apache.hadoop.hive.ql.metadata.Table; @@ -33,6 +34,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; /** * CommitTxnHandler @@ -54,6 +57,17 @@ public List<Task<?>> handle(Context context) String tableNamePrev = null; String tblName = null; + // Saving the timestamp of all write commit txn in metric 'progress' to calculate lag between src and tgt + List<Long> writeIds = msg.getWriteIds(); + List<String> databases = Optional.ofNullable(msg.getDatabases()) + .orElse(Collections.emptyList()) + .stream() + .map(StringUtils::normalizeIdentifier) + .toList(); + if (databases.contains(dbName) && writeIds != null && !writeIds.isEmpty()) { + context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp()); + } + ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName, null, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec(), context.getDumpDirectory(), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java index 87990dbaafc..d6601aabf26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java @@ -136,6 +136,22 @@ public void reportFailoverStart(String stageName, Map<String, Long> metricMap, } } + public void setSrcTimeInProgress(long endTimeOnSrc) throws SemanticException { + if (isEnabled) { + LOG.debug("Updating last commit time on src in progress as: {}", endTimeOnSrc); + Progress progress = replicationMetric.getProgress(); + Stage stage = progress.getStageByName("REPL_LOAD"); + if (stage == null) { + return; + } + stage.setEndTimeOnSrc(endTimeOnSrc); + stage.setEndTimeOnTgt(getCurrentTimeInMillis()); + progress.addStage(stage); + replicationMetric.setProgress(progress); + metricCollector.addMetric(replicationMetric); + } + } + public void reportStageEnd(String stageName, Status status, long lastReplId, SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker replStatsTracker) throws SemanticException { unRegisterMBeanSafe(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java index 83df9f06eca..09bd15e8a04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java @@ -32,6 +32,8 @@ public class Stage { private Status status; private long startTime; private long endTime; + private long endTimeOnSrc; + private long endTimeOnTgt; private Map<String, Metric> metrics = new HashMap<>(); private String errorLogPath; private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new SnapshotUtils.ReplSnapshotCount(); @@ -58,6 +60,8 @@ public Stage(Stage stage) { this.errorLogPath = stage.errorLogPath; this.replSnapshotCount = stage.replSnapshotCount; this.replStats = stage.replStats; + this.endTimeOnSrc = stage.endTimeOnSrc; + this.endTimeOnTgt = stage.endTimeOnTgt; } public String getName() { @@ -92,6 +96,21 @@ public void setEndTime(long endTime) { this.endTime = endTime; } + public long getEndTimeOnSrc() { + return endTimeOnSrc; + } + + public void setEndTimeOnSrc(long endTimeOnSrc) { + this.endTimeOnSrc = endTimeOnSrc; + } + + public long getEndTimeOnTgt() { + return endTimeOnTgt; + } + + public void setEndTimeOnTgt(long endTimeOnTgt) { + this.endTimeOnTgt = endTimeOnTgt; + } public void addMetric(Metric metric) { this.metrics.put(metric.getName(), metric); diff --git a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out index 6fd00f4471a..ddf92089b7d 100644 --- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out +++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out @@ -92,5 +92,5 @@ POSTHOOK: type: QUERY POSTHOOK: Input: sys@replication_metrics POSTHOOK: Input: sys@replication_metrics_orig #### A masked pattern was here #### -repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA {"status":"SUCCESS","stages":[{"name":"REPL [...] -repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA= {"status [...] +repl1 1 {"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA= {"status":"SUCCESS","stages":[{ [...] +repl2 1 {"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null} H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPA [...]