This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bb08099d91 HIVE-29188: [hiveACIDReplication] Add src and tgt commit 
time in replication metrics for better monitoring (#6071)
4bb08099d91 is described below

commit 4bb08099d91acbefee73a449a36abb1ecd2b5925
Author: Shivam Kumar <67912385+shivao...@users.noreply.github.com>
AuthorDate: Tue Sep 16 08:15:05 2025 +0530

    HIVE-29188: [hiveACIDReplication] Add src and tgt commit time in 
replication metrics for better monitoring (#6071)
    
    * Details:
      * Currently when the Hive ACID replication is running we do not have any 
idea of how the replication is progressing.
      * Even when the replication is not running, we do not have any idea how 
much the target is behind the src.
      * This commit is to add this info in replication_metrics.
    
    Co-authored-by: shivam02 <shivam.ku...@cloudera.com>
---
 .../ql/parse/repl/load/message/AbortTxnHandler.java   | 14 ++++++++++++++
 .../ql/parse/repl/load/message/CommitTxnHandler.java  | 14 ++++++++++++++
 .../parse/repl/metric/ReplicationMetricCollector.java | 16 ++++++++++++++++
 .../hadoop/hive/ql/parse/repl/metric/event/Stage.java | 19 +++++++++++++++++++
 .../llap/replication_metrics_ingest.q.out             |  4 ++--
 5 files changed, 65 insertions(+), 2 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index c92ef253de8..6cd3965757a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -27,6 +28,8 @@
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * AbortTxnHandler
@@ -43,6 +46,17 @@ public List<Task<?>> handle(Context context)
 
     AbortTxnMessage msg = 
deserializer.getAbortTxnMessage(context.dmd.getPayload());
 
+    // Saving the timestamp of all write abort txn in metric 'progress' to 
calculate lag between src and tgt
+    List<Long> writeIds = msg.getWriteIds();
+    List<String> databases = Optional.ofNullable(msg.getDbsUpdated())
+                              .orElse(Collections.emptyList())
+                              .stream()
+                              .map(StringUtils::normalizeIdentifier)
+                              .toList();
+    if (databases.contains(context.dbName) && writeIds != null && 
!writeIds.isEmpty()) {
+      context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
+    }
+
     Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
         new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), 
context.dbName, null,
                 msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, 
context.eventOnlyReplicationSpec(),
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 2224793a059..65329a8f0d3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -20,6 +20,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -33,6 +34,8 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * CommitTxnHandler
@@ -54,6 +57,17 @@ public List<Task<?>> handle(Context context)
     String tableNamePrev = null;
     String tblName = null;
 
+    // Saving the timestamp of all write commit txn in metric 'progress' to 
calculate lag between src and tgt
+    List<Long> writeIds = msg.getWriteIds();
+    List<String> databases = Optional.ofNullable(msg.getDatabases())
+                              .orElse(Collections.emptyList())
+                              .stream()
+                              .map(StringUtils::normalizeIdentifier)
+                              .toList();
+    if (databases.contains(dbName) && writeIds != null && !writeIds.isEmpty()) 
{
+      context.getMetricCollector().setSrcTimeInProgress(msg.getTimestamp());
+    }
+
     ReplTxnWork work = new 
ReplTxnWork(HiveUtils.getReplPolicy(context.dbName), context.dbName,
                                        null, msg.getTxnId(), 
ReplTxnWork.OperationType.REPL_COMMIT_TXN,
                                         context.eventOnlyReplicationSpec(), 
context.getDumpDirectory(),
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index 87990dbaafc..d6601aabf26 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -136,6 +136,22 @@ public void reportFailoverStart(String stageName, 
Map<String, Long> metricMap,
     }
   }
 
+  public void setSrcTimeInProgress(long endTimeOnSrc) throws SemanticException 
{
+    if (isEnabled) {
+      LOG.debug("Updating last commit time on src in progress as: {}", 
endTimeOnSrc);
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = progress.getStageByName("REPL_LOAD");
+      if (stage == null) {
+        return;
+      }
+      stage.setEndTimeOnSrc(endTimeOnSrc);
+      stage.setEndTimeOnTgt(getCurrentTimeInMillis());
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);
+      metricCollector.addMetric(replicationMetric);
+    }
+  }
+
   public void reportStageEnd(String stageName, Status status, long lastReplId,
       SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker 
replStatsTracker) throws SemanticException {
     unRegisterMBeanSafe();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 83df9f06eca..09bd15e8a04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -32,6 +32,8 @@ public class Stage {
   private Status status;
   private long startTime;
   private long endTime;
+  private long endTimeOnSrc;
+  private long endTimeOnTgt;
   private Map<String, Metric> metrics = new HashMap<>();
   private String errorLogPath;
   private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new 
SnapshotUtils.ReplSnapshotCount();
@@ -58,6 +60,8 @@ public Stage(Stage stage) {
     this.errorLogPath = stage.errorLogPath;
     this.replSnapshotCount = stage.replSnapshotCount;
     this.replStats = stage.replStats;
+    this.endTimeOnSrc = stage.endTimeOnSrc;
+    this.endTimeOnTgt = stage.endTimeOnTgt;
   }
 
   public String getName() {
@@ -92,6 +96,21 @@ public void setEndTime(long endTime) {
     this.endTime = endTime;
   }
 
+  public long getEndTimeOnSrc() {
+    return endTimeOnSrc;
+  }
+
+  public void setEndTimeOnSrc(long endTimeOnSrc) {
+    this.endTimeOnSrc = endTimeOnSrc;
+  }
+
+  public long getEndTimeOnTgt() {
+    return endTimeOnTgt;
+  }
+
+  public void setEndTimeOnTgt(long endTimeOnTgt) {
+    this.endTimeOnTgt = endTimeOnTgt;
+  }
 
   public void addMetric(Metric metric) {
     this.metrics.put(metric.getName(), metric);
diff --git 
a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out 
b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index 6fd00f4471a..ddf92089b7d 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
 POSTHOOK: Input: sys@replication_metrics_orig
 #### A masked pattern was here ####
-repl1  1       
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
     
H4sIAAAAAAAA/22PwQ6CMBBE/2XPHOTKTSsmJojEwskQ02gDJKUl2+2J9N8tEohEb7sz83ayI1gS5CwkwCvGUs4hmqRGBuk+gha9DN4tLbLHsboUs/sHQCq7KbqLQOrXOveSsHtubp2qnJXnaz6BT4coNTHjNH3yZEioZfXRCpX7Q5b+EvGWiH0d6hENZqYpBLWQaKdUBCgHxbUYbGsW9MsID9lZ8LV/A7NIwGISAQAA
    {"status":"SUCCESS","stages":[{"name":"REPL [...]
-repl2  1       
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
      
H4sIAAAAAAAA/22PwQqDMBBE/yVnD/XqzUYLBbFS9VSkBF1UiImsm5Pk3xu1CtLedmb3zbAzm0iQmVjA8pLzOM+Zt1gtOOs1MyUGcLtnnCXv5BFG2/YPgFT0y+nFY6CaYx6AsK9PWbcy5cX9kS5gbRBBEddG0XpPmoTcpfUOqAivSfxL+GfCt5WrR9SY6DYT1LFAGSk9hjDKXIlx6vSOumgzcARB0KzVTkYgYZP2y7hfpy3EVvYDvpfiNy0BAAA=
        {"status [...]
+repl1  1       
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
     
H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA=
        {"status":"SUCCESS","stages":[{ [...]
+repl2  1       
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
      
H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPA
 [...]

Reply via email to