This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9f20777fdb9 HIVE-29440: [hiveACIDRepl] Add dumpIDs to skipped items
LOAD/DUMP items (#6296)
9f20777fdb9 is described below
commit 9f20777fdb91514337c2f8b1836eb6416d34af56
Author: Shivam Kumar <[email protected]>
AuthorDate: Wed Feb 11 10:27:00 2026 +0530
HIVE-29440: [hiveACIDRepl] Add dumpIDs to skipped items LOAD/DUMP items
(#6296)
---
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 19 +++++++--
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 10 +++++
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 45 +++++++++++++++++++++-
.../repl/metric/ReplicationMetricCollector.java | 32 +++++++++++++--
.../hive/ql/parse/repl/metric/event/Stage.java | 10 +++++
.../ql/parse/repl/metric/event/StageMapper.java | 30 +++++++++++++++
.../llap/replication_metrics_ingest.q.out | 4 +-
7 files changed, 140 insertions(+), 10 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 2390984cce4..d6bb2eb7949 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -94,7 +94,6 @@
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
-import
org.apache.hadoop.hive.ql.parse.repl.metric.event.Metadata.ReplicationType;
import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -342,8 +341,10 @@ else if (work.isBootstrap()) {
} else {
LOG.info("Previous Dump is not yet loaded. Skipping this
iteration.");
}
- ReplUtils.reportStatusInReplicationMetrics(getName(),
Status.SKIPPED, null, conf,
- work.dbNameOrPattern, work.isBootstrap() ?
ReplicationType.BOOTSTRAP: ReplicationType.INCREMENTAL);
+ // Saving the executionId of last successful dump to be used in UI
+ DumpMetaData lastWrittenDmd = new
DumpMetaData(previousValidHiveDumpPath, conf);
+ ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
+ getName(), Status.SKIPPED,
lastWrittenDmd.getDumpExecutionId(), null, conf);
}
}
} catch (RuntimeException e) {
@@ -940,6 +941,12 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData
dmd, Path cmRoot, Hive
long estimatedNumEvents =
evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
maxEventLimit);
try {
+ // Persisting the dump metadata with the execution ID early allows
concurrent LOAD operations to associate
+ // with this DUMP cycle. This file will be overwritten with the final
metadata in the finally block.
+ long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID,
0L);
+ dmd.setDump(DumpType.INCREMENTAL, null, null, cmRoot, executionId,
false);
+ dmd.write(true);
+
IncrementalDumpLogger replLogger =
new IncrementalDumpLogger(dbName, dumpRoot.toString(),
estimatedNumEvents, work.eventFrom, work.eventTo,
maxEventLimit);
@@ -1364,6 +1371,12 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path
cmRoot, Hive hiveDb)
FileList extTableFileList = createTableFileList(dumpRoot,
EximUtil.FILE_LIST_EXTERNAL, conf);
FileList snapPathFileList = isSnapshotEnabled ? createTableFileList(
SnapshotUtils.getSnapshotFileListPath(dumpRoot),
EximUtil.FILE_LIST_EXTERNAL_SNAPSHOT_CURRENT, conf) : null) {
+ // Persisting the dump metadata with the execution ID early allows
concurrent LOAD operations to associate
+ // with this DUMP cycle. This file will be overwritten with the final
metadata in the finally block.
+ long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID,
0L);
+ dmd.setDump(DumpType.BOOTSTRAP, null, null, cmRoot, executionId, false);
+ dmd.write(true);
+
ExportService exportService = new ExportService(conf);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("Dumping db: " + dbName);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 88ce870de9c..d60b15466ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -566,6 +566,16 @@ public static void reportStatusInReplicationMetrics(String
stageName, Status sta
metricCollector.reportStageEnd(stageName, status, errorLogPath);
}
+ public static void
reportStatusInReplicationMetricsWithLastExecutionId(String stageName, Status
status,
+ long
lastDumpId, String errorLogPath,
+
HiveConf conf)
+ throws SemanticException {
+ ReplicationMetricCollector metricCollector =
+ new ReplicationMetricCollector(null, null, null, 0, conf) {};
+ metricCollector.reportStageStart(stageName, new HashMap<>());
+ metricCollector.reportStageEndWithLastExecutionId(stageName, status,
errorLogPath, lastDumpId);
+ }
+
public static boolean isErrorRecoverable(Throwable e) {
int errorCode = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
return errorCode > ErrorMsg.GENERIC_ERROR.getErrorCode();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index a43dc67fc83..a31be8bb206 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -370,8 +370,7 @@ private void analyzeReplLoad(ASTNode ast) throws
SemanticException {
dmd.setOptimizedBootstrapToDumpMetadataFile(conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID,
0L));
}
} else {
- ReplUtils.reportStatusInReplicationMetrics("REPL_LOAD",
Status.SKIPPED, null, conf, sourceDbNameOrPattern, null);
- LOG.warn("No dump to load or the previous dump already loaded");
+ handleSkippedLoad(latestDumpPath);
}
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
@@ -379,6 +378,48 @@ private void analyzeReplLoad(ASTNode ast) throws
SemanticException {
}
}
+ private void handleSkippedLoad(Path latestDumpPath) throws SemanticException
{
+ Long executionId = extractExecutionIdFromDump(latestDumpPath);
+ reportSkippedLoadMetrics(executionId);
+ LOG.warn("No dump to load or the previous dump already loaded");
+ }
+
+ private Long extractExecutionIdFromDump(Path latestDumpPath) {
+ if (latestDumpPath == null) {
+ return null;
+ }
+
+ try {
+ Path metadataPath = new Path(latestDumpPath,
ReplUtils.REPL_HIVE_BASE_DIR);
+ FileSystem fs = metadataPath.getFileSystem(conf);
+
+ if (fs.exists(metadataPath) && !fs.exists(new Path(metadataPath,
LOAD_ACKNOWLEDGEMENT.toString()))) {
+ DumpMetaData lastWrittenDmd = new DumpMetaData(metadataPath, conf);
+ Long executionId = lastWrittenDmd.getDumpExecutionId();
+ LOG.debug("Retrieved execution ID {} from latest dump path",
executionId);
+ return executionId;
+ } else {
+ LOG.debug("Metadata path does not exist: {}", metadataPath);
+ }
+ } catch (Exception e) {
+ LOG.warn("Unable to retrieve execution ID from dump metadata at {}: {}",
+ latestDumpPath, e.getMessage());
+ LOG.debug("Full exception:", e);
+ }
+
+ return null;
+ }
+
+ private void reportSkippedLoadMetrics(Long executionId) throws
SemanticException {
+ if (executionId != null) {
+ ReplUtils.reportStatusInReplicationMetricsWithLastExecutionId(
+ "REPL_LOAD", Status.SKIPPED, executionId, null, conf);
+ } else {
+ ReplUtils.reportStatusInReplicationMetrics(
+ "REPL_LOAD", Status.SKIPPED, null, conf, sourceDbNameOrPattern,
null);
+ }
+ }
+
private ReplicationMetricCollector initReplicationLoadMetricCollector(String
dumpDirectory, String dbNameToLoadIn,
DumpMetaData dmd) throws SemanticException {
ReplicationMetricCollector collector;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
index d6601aabf26..581a521c1d9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/ReplicationMetricCollector.java
@@ -52,6 +52,7 @@ public abstract class ReplicationMetricCollector {
private static boolean enableForTests;
private static long scheduledExecutionIdForTests = 0L;
private HiveConf conf;
+ private static final String STAGE_ENDED_LOG_FORMAT = "Stage Ended {}, {}";
public void setMetricsMBean(ObjectName metricsMBean) {
this.metricsMBean = metricsMBean;
@@ -156,7 +157,7 @@ public void reportStageEnd(String stageName, Status status,
long lastReplId,
SnapshotUtils.ReplSnapshotCount replSnapshotCount, ReplStatsTracker
replStatsTracker) throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
- LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId );
+ LOG.debug("Stage ended {}, {}, {}", stageName, status, lastReplId);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
@@ -183,10 +184,35 @@ public void reportStageEnd(String stageName, Status
status, long lastReplId,
}
}
+ public void reportStageEndWithLastExecutionId(String stageName, Status
status, String errorLogPath, long lastDumpId)
+ throws SemanticException {
+ unRegisterMBeanSafe();
+ if (isEnabled) {
+ LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
+ Progress progress = replicationMetric.getProgress();
+ Stage stage = progress.getStageByName(stageName);
+ if(stage == null){
+ stage = new Stage(stageName, status, -1L);
+ }
+ stage.setStatus(status);
+ stage.setEndTime(getCurrentTimeInMillis());
+ stage.setLastSuccessfulDumpId(lastDumpId);
+ if (errorLogPath != null) {
+ stage.setErrorLogPath(errorLogPath);
+ }
+ progress.addStage(stage);
+ replicationMetric.setProgress(progress);
+ metricCollector.addMetric(replicationMetric);
+ if (Status.FAILED == status || Status.FAILED_ADMIN == status ||
Status.SKIPPED == status) {
+ reportEnd(status);
+ }
+ }
+ }
+
public void reportStageEnd(String stageName, Status status, String
errorLogPath) throws SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
- LOG.debug("Stage Ended {}, {}", stageName, status );
+ LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
@@ -209,7 +235,7 @@ public void reportStageEnd(String stageName, Status status,
String errorLogPath)
public void reportStageEnd(String stageName, Status status) throws
SemanticException {
unRegisterMBeanSafe();
if (isEnabled) {
- LOG.debug("Stage Ended {}, {}", stageName, status );
+ LOG.debug(STAGE_ENDED_LOG_FORMAT, stageName, status);
Progress progress = replicationMetric.getProgress();
Stage stage = progress.getStageByName(stageName);
if(stage == null){
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
index 09bd15e8a04..1491a3ceae8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Stage.java
@@ -34,6 +34,7 @@ public class Stage {
private long endTime;
private long endTimeOnSrc;
private long endTimeOnTgt;
+ private long lastSuccessfulDumpId;
private Map<String, Metric> metrics = new HashMap<>();
private String errorLogPath;
private SnapshotUtils.ReplSnapshotCount replSnapshotCount = new
SnapshotUtils.ReplSnapshotCount();
@@ -62,6 +63,7 @@ public Stage(Stage stage) {
this.replStats = stage.replStats;
this.endTimeOnSrc = stage.endTimeOnSrc;
this.endTimeOnTgt = stage.endTimeOnTgt;
+ this.lastSuccessfulDumpId = stage.lastSuccessfulDumpId;
}
public String getName() {
@@ -112,6 +114,14 @@ public void setEndTimeOnTgt(long endTimeOnTgt) {
this.endTimeOnTgt = endTimeOnTgt;
}
+ public long getLastSuccessfulDumpId() {
+ return lastSuccessfulDumpId;
+ }
+
+ public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
+ this.lastSuccessfulDumpId = lastSuccessfulDumpId;
+ }
+
public void addMetric(Metric metric) {
this.metrics.put(metric.getName(), metric);
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
index c78b04de25e..c5dee9ea1ec 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/StageMapper.java
@@ -37,6 +37,12 @@ public class StageMapper {
private long endTime = 0;
+ private long endTimeOnSrc = 0;
+
+ private long endTimeOnTgt = 0;
+
+ private long lastSuccessfulDumpId = 0;
+
private List<Metric> metrics = new ArrayList<>();
private String errorLogPath;
@@ -64,6 +70,30 @@ public long getEndTime() {
return endTime;
}
+ public long getEndTimeOnSrc() {
+ return endTimeOnSrc;
+ }
+
+ public void setEndTimeOnSrc(long endTimeOnSrc) {
+ this.endTimeOnSrc = endTimeOnSrc;
+ }
+
+ public long getEndTimeOnTgt() {
+ return endTimeOnTgt;
+ }
+
+ public void setEndTimeOnTgt(long endTimeOnTgt) {
+ this.endTimeOnTgt = endTimeOnTgt;
+ }
+
+ public long getLastSuccessfulDumpId() {
+ return lastSuccessfulDumpId;
+ }
+
+ public void setLastSuccessfulDumpId(long lastSuccessfulDumpId) {
+ this.lastSuccessfulDumpId = lastSuccessfulDumpId;
+ }
+
public List<Metric> getMetrics() {
return metrics;
}
diff --git
a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
index ddf92089b7d..7d4a0035138 100644
--- a/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
+++ b/ql/src/test/results/clientpositive/llap/replication_metrics_ingest.q.out
@@ -92,5 +92,5 @@ POSTHOOK: type: QUERY
POSTHOOK: Input: sys@replication_metrics
POSTHOOK: Input: sys@replication_metrics_orig
#### A masked pattern was here ####
-repl1 1
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22PsQ6CMBRF/6Uzg6xsWjExQSC2TIaYBhsgKS15fZ1I/90iwYi6tefe0/c6EYsCnSUJYRWlKWMkmlErA7pNRItBhuyaltn9WF3KJf0jAPJ+ru4iIvXj+1xoBs0W8BZfYJAIfbOZdqpyys9FPj/dOACpkRqnlz4aFGq9+ugt8f0hS3+NeGvEvg47ABjITFsK7EiinVIRATkqpsVoO7OqH0H4sl2Ar/0T5NGeBTQBAAA=
{"status":"SUCCESS","stages":[{ [...]
-repl2 1
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22QQQuDMAyF/0vPHubV21YdDERl1dOQUVzQQW0lpifpf1/VOXDbLe8l30vIxEaSZEcWMVFxngjBgtlqwVu3iWnZg+9dkyK9p/kxXrt/AKTyOY8eAgb68V3nWmCzN8qWFqMHwmez23auMl5e8myObiwiaOLG6nWeDEm1SRd8oPJ4SpNfItwToav9DYgGU9MWkjoWaatUwBAGJbQcxs5sqI+2PUeQBI9ltZcxKFilezP+G+Ma4mr3Atju2TJPA
[...]
+repl1 1
{"dbName":"src","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.0,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22Qyw6CMBBF/6VrFrJlpwUTEhRiYWWIaUoFktKS6XRF+Hd5BCPqbnpuT3szA7HI0VkSEFZQGjFGvBnVckL3gWjeySm7RVnyCItLtqZ/BMC8na8ePCJ19T2nmoHYg7zGBShukTkhpLVPp0LX9XG1BJ1EaMWuxrm40jxOr/OfwgFIjdQ4vT6EBrnajqP3lvLjKYl+DX9v+GM5lQMwkJg649iQQDulPAKyV0zz3jZmUz+CaRd2BWM5vgCmU7uCTQEAAA==
{"statu [...]
+repl2 1
{"dbName":"destination","replicationType":"BOOTSTRAP","replicatedDBSizeInKB":0.00390625,"stagingDir":"dummyDir","lastReplId":0,"failoverMetadataLoc":null,"failoverEventId":0,"failoverEndPoint":null,"failoverType":null}
H4sIAAAAAAAA/22QMQvCMBSE/0vmDrq6aVpBKLaYdpIiIT6rkCbl5WWS/HdTo0LVLXeX73LkzhxJ8o6tmGg5L4Rg2WT1EK3jnRk5QMwORV2eymqdp/QPgNTcpquLjIE5f58rI1DNjaanp6GlI+GVAucuXud+GHfnZzAA4U3NZmzbPW921X56U3lEMMStN6mILEn9liH7QM16Uxa/xHJOLEMXxyFaLG1fS7qylfFaZwxh1MLI0V3tG43VfuAIkiBNjTIHDUmGF
[...]