This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f20777fdb9 HIVE-29440: [hiveACIDRepl] Add dumpIDs to skipped items 
LOAD/DUMP items (#6296)
9f20777fdb9 is described below

commit 9f20777fdb91514337c2f8b1836eb6416d34af56
Author: Shivam Kumar <[email protected]>
AuthorDate: Wed Feb 11 10:27:00 2026 +0530

    HIVE-29440: [hiveACIDRepl] Add dumpIDs to skipped items LOAD/DUMP items 
(#6296)
---
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 19 +++++++--
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   | 10 +++++
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 45 +++++++++++++++++++++-
 .../repl/metric/ReplicationMetricCollector.java    | 32 +++++++++++++--
 .../hive/ql/parse/repl/metric/event/Stage.java     | 10 +++++
 .../ql/parse/repl/metric/event/StageMapper.java    | 30 +++++++++++++++
 .../llap/replication_metrics_ingest.q.out          |  4 +-
 7 files changed, 140 insertions(+), 10 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 2390984cce4..d6bb2eb7949 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -94,7 +94,6 @@
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
-import 
org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -342,8 +341,10 @@ else if (work.isBootstrap()) {
           } else {
             LOG.info("Previous Dump is not yet loaded. Skipping this 
iteration.");
           }
-          ReplUtils.reportStatusInReplicationMetrics(getName(), 
Status.SKIPPED, null, conf,
-                  work.dbNameOrPattern, work.isBootstrap() ? 
ReplicationType.BOOTSTRAP: ReplicationType.INCREMENTAL);
+          // Saving the executionId of last successful dump to be used in UI
+          DumpMetaData lastWrittenDmd = new 
DumpMetaData(previousValidHiveDumpPath, conf);
+          ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
+                  getName(), Status.SKIPPED, 
lastWrittenDmd.getDumpExecutionId(), null, conf);
         }
       }
     } catch (RuntimeException e) {
@@ -940,6 +941,12 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData 
dmd, Path cmRoot, Hive
     long estimatedNumEvents = 
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
         maxEventLimit);
     try {
+      // Persisting the dump metadata with the execution ID early allows 
concurrent LOAD operations to associate
+      // with this DUMP cycle. This file will be overwritten with the final 
metadata in the finally block.
+      long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 
0L);
+      dmd.setDump(DumpType.INCREMENTAL, null, null, cmRoot, executionId, 
false);
+      dmd.write(true);
+
       IncrementalDumpLogger replLogger =
           new IncrementalDumpLogger(dbName, dumpRoot.toString(), 
estimatedNumEvents, work.eventFrom, work.eventTo,
               maxEventLimit);
@@ -1364,6 +1371,12 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path 
cmRoot, Hive hiveDb)
         FileList extTableFileList = createTableFileList(dumpRoot, 
EximUtil.FILE_LIST_EXTERNAL, conf);
         FileList snapPathFileList = isSnapshotEnabled ? createTableFileList(
             SnapshotUtils.getSnapshotFileListPath(dumpRoot), 
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf) : null) {
+      // Persisting the dump metadata with the execution ID early allows 
concurrent LOAD operations to associate
+      // with this DUMP cycle. This file will be overwritten with the final 
metadata in the finally block.
+      long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 
0L);
+      dmd.setDump(DumpType.BOOTSTRAP, null, null, cmRoot, executionId, false);
+      dmd.write(true);
+
       ExportService exportService = new ExportService(conf);
       for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
         LOG.debug("Dumping db: " + dbName);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 88ce870de9c..d60b15466ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -566,6 +566,16 @@ public static void reportStatusInReplicationMetrics(String 
stageName, Status sta
     metricCollector.reportStageEnd(stageName, status, errorLogPath);
   }
 
+  public static void 
reportStatusInReplicationMetricsWithLastExecutionId(String stageName, Status 
status,
+                                                                         long 
lastDumpId, String errorLogPath,
+                                                                         
HiveConf conf)
+          throws SemanticException {
+    ReplicationMetricCollector metricCollector =
+            new ReplicationMetricCollector(null, null, null, 0, conf) {};
+    metricCollector.reportStageStart(stageName, new HashMap<>());
+    metricCollector.reportStageEndWithLastExecutionId(stageName, status, 
errorLogPath, lastDumpId);
+  }
+
   public static boolean isErrorRecoverable(Throwable e) {
     int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
     return errorCode > ErrorMsg.GENERIC_ERROR.getErrorCode();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index a43dc67fc83..a31be8bb206 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -370,8 +370,7 @@ private void analyzeReplLoad(ASTNode ast) throws 
SemanticException {
           
dmd.setOptimizedBootstrapToDumpMetadataFile(conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID,
 0L));
         }
       } else {
-        ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD", 
Status.SKIPPED, null, conf,  sourceDbNameOrPattern, null);
-        LOG.warn("No dump to load or the previous dump already loaded");
+        handleSkippedLoad(latestDumpPath);
       }
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
@@ -379,6 +378,48 @@ private void analyzeReplLoad(ASTNode ast) throws 
SemanticException {
     }
   }
 
+  private void handleSkippedLoad(Path latestDumpPath) throws SemanticException 
{
+    Long executionId = extractExecutionIdFromDump(latestDumpPath);
+    reportSkippedLoadMetrics(executionId);
+    LOG.warn("No dump to load or the previous dump already loaded");
+  }
+
+  private Long extractExecutionIdFromDump(Path latestDumpPath) {
+    if (latestDumpPath == null) {
+      return null;
+    }
+
+    try {
+      Path metadataPath = new Path(latestDumpPath, 
ReplUtils.REPL_HIVE_BASE_DIR);
+      FileSystem fs = metadataPath.getFileSystem(conf);
+
+      if (fs.exists(metadataPath) && !fs.exists(new Path(metadataPath, 
LOAD_ACKNOWLEDGEMENT.toString()))) {
+        DumpMetaData lastWrittenDmd = new DumpMetaData(metadataPath, conf);
+        Long executionId = lastWrittenDmd.getDumpExecutionId();
+        LOG.debug("Retrieved execution ID {} from latest dump path", 
executionId);
+        return executionId;
+      } else {
+        LOG.debug("Metadata path does not exist: {}", metadataPath);
+      }
+    } catch (Exception e) {
+      LOG.warn("Unable to retrieve execution ID from dump metadata at {}: {}",
+              latestDumpPath, e.getMessage());
+      LOG.debug("Full exception:", e);
+    }
+
+    return null;
+  }
+
+  private void reportSkippedLoadMetrics(Long executionId) throws 
SemanticException {
+    if (executionId != null) {
+      ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
+              "REPL_LOAD", Status.SKIPPED, executionId, null, conf);
+    } else {
+      ReplUtils.reportStatusInReplicationMetrics(
+              "REPL_LOAD", Status.SKIPPED, null, conf, sourceDbNameOrPattern, 
null);
+    }
+  }
+
   private ReplicationMetricCollector initReplicationLoadMetricCollector(String 
dumpDirectory, String dbNameToLoadIn,
                                                                         
DumpMetaData dmd) throws SemanticException {
     ReplicationMetricCollector collector;
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index d6601aabf26..581a521c1d9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -52,6 +52,7 @@ public abstract class ReplicationMetricCollector {
   private static boolean enableForTests;
   private static long scheduledExecutionIdForTests = 0L;
   private HiveConf conf;
+  private static final String STAGE_ENDED_LOG_FORMAT = "Stage Ended {}, {}";
 
   public void setMetricsMBean(ObjectName metricsMBean) {
     this.metricsMBean = metricsMBean;
@@ -156,7 +157,7 @@ public void reportStageEnd(String stageName, Status status, 
long lastReplId,
       SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker 
replStatsTracker) throws SemanticException {
     unRegisterMBeanSafe();
     if (isEnabled) {
-      LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId );
+      LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId);
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       if(stage == null){
@@ -183,10 +184,35 @@ public void reportStageEnd(String stageName, Status 
status, long lastReplId,
     }
   }
 
+  public void reportStageEndWithLastExecutionId(String stageName, Status 
status, String errorLogPath, long lastDumpId)
+          throws SemanticException {
+    unRegisterMBeanSafe();
+    if (isEnabled) {
+      LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
+      Progress progress = replicationMetric.getProgress();
+      Stage stage = progress.getStageByName(stageName);
+      if(stage == null){
+        stage = new Stage(stageName, status, -1L);
+      }
+      stage.setStatus(status);
+      stage.setEndTime(getCurrentTimeInMillis());
+      stage.setLastSuccessfulDumpId(lastDumpId);
+      if (errorLogPath != null) {
+        stage.setErrorLogPath(errorLogPath);
+      }
+      progress.addStage(stage);
+      replicationMetric.setProgress(progress);
+      metricCollector.addMetric(replicationMetric);
+      if (Status.FAILED == status || Status.FAILED_ADMIN == status || 
Status.SKIPPED == status) {
+        reportEnd(status);
+      }
+    }
+  }
+
   public void reportStageEnd(String stageName, Status status, String 
errorLogPath) throws SemanticException {
     unRegisterMBeanSafe();
     if (isEnabled) {
-      LOG.debug("Stage Ended {}, {}", stageName, status );
+      LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       if(stage == null){
@@ -209,7 +235,7 @@ public void reportStageEnd(String stageName, Status status, 
String errorLogPath)
   public void reportStageEnd(String stageName, Status status) throws 
SemanticException {
     unRegisterMBeanSafe();
     if (isEnabled) {
-      LOG.debug("Stage Ended {}, {}", stageName, status );
+      LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
       Progress progress = replicationMetric.getProgress();
       Stage stage = progress.getStageByName(stageName);
       if(stage == null){
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 09bd15e8a04..1491a3ceae8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -34,6 +34,7 @@ public class Stage {
   private long endTime;
   private long endTimeOnSrc;
   private long endTimeOnTgt;
+  private long lastSuccessfulDumpId;
   private Map<String, Metric> metrics = new HashMap<>();
   private String errorLogPath;
   private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new 
SnapshotUtils.ReplSnapshotCount();
@@ -62,6 +63,7 @@ public Stage(Stage stage) {
     this.replStats = stage.replStats;
     this.endTimeOnSrc = stage.endTimeOnSrc;
     this.endTimeOnTgt = stage.endTimeOnTgt;
+    this.lastSuccessfulDumpId = stage.lastSuccessfulDumpId;
   }
 
   public String getName() {
@@ -112,6 +114,14 @@ public void setEndTimeOnTgt(long endTimeOnTgt) {
     this.endTimeOnTgt = endTimeOnTgt;
   }
 
+  public long getLastSuccessfulDumpId() {
+    return lastSuccessfulDumpId;
+  }
+
+  public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
+    this.lastSuccessfulDumpId = lastSuccessfulDumpId;
+  }
+
   public void addMetric(Metric metric) {
     this.metrics.put(metric.getName(), metric);
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
index c78b04de25e..c5dee9ea1ec 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
@@ -37,6 +37,12 @@ public class StageMapper {
 
   private long endTime = 0;
 
+  private long endTimeOnSrc = 0;
+
+  private long endTimeOnTgt = 0;
+
+  private long lastSuccessfulDumpId = 0;
+
   private List<Metric> metrics = new ArrayList<>();
 
   private String errorLogPath;
@@ -64,6 +70,30 @@ public long getEndTime() {
     return endTime;
   }
 
+  public long getEndTimeOnSrc() {
+    return endTimeOnSrc;
+  }
+
+  public void setEndTimeOnSrc(long endTimeOnSrc) {
+    this.endTimeOnSrc = endTimeOnSrc;
+  }
+
+  public long getEndTimeOnTgt() {
+    return endTimeOnTgt;
+  }
+
+  public void setEndTimeOnTgt(long endTimeOnTgt) {
+    this.endTimeOnTgt = endTimeOnTgt;
+  }
+
+  public long getLastSuccessfulDumpId() {
+    return lastSuccessfulDumpId;
+  }
+
+  public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
+    this.lastSuccessfulDumpId = lastSuccessfulDumpId;
+  }
+
   public List<Metric> getMetrics() {
     return metrics;
   }
diff --git 
a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out 
b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index ddf92089b7d..7d4a0035138 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: sys@replication_metrics
 POSTHOOK: Input: sys@replication_metrics_orig
 #### A masked pattern was here ####
-repl1  1       
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
     
H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA=
        {"status":"SUCCESS","stages":[{ [...]
-repl2  1       
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
      
H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPA
 [...]
+repl1  1       
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
     
H4sIAAAAAAAA/22Qyw6CMBBF/6VrFrJlpwUTEhRiYWWIaUoFktKS6XRF+Hd5BCPqbnpuT3szA7HI0VkSEFZQGjFGvBnVckL3gWjeySm7RVnyCItLtqZ/BMC8na8ePCJ19T2nmoHYg7zGBShukTkhpLVPp0LX9XG1BJ1EaMWuxrm40jxOr/OfwgFIjdQ4vT6EBrnajqP3lvLjKYl+DX9v+GM5lQMwkJg649iQQDulPAKyV0zz3jZmUz+CaRd2BWM5vgCmU7uCTQEAAA==
        {"statu [...]
+repl2  1       
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
      
H4sIAAAAAAAA/22QMQvCMBSE/0vmDrq6aVpBKLaYdpIiIT6rkCbl5WWS/HdTo0LVLXeX73LkzhxJ8o6tmGg5L4Rg2WT1EK3jnRk5QMwORV2eymqdp/QPgNTcpquLjIE5f58rI1DNjaanp6GlI+GVAucuXud+GHfnZzAA4U3NZmzbPW921X56U3lEMMStN6mILEn9liH7QM16Uxa/xHJOLEMXxyFaLG1fS7qylfFaZwxh1MLI0V3tG43VfuAIkiBNjTIHDUmGF
 [...]

Reply via email to