nsivabalan commented on code in PR #12948:
URL: https://github.com/apache/hudi/pull/12948#discussion_r1992507574
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1726,4 +1659,302 @@ public boolean isInitialized() {
}
protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();
+
+ protected class MetadataWriteHandler implements Serializable {
+ HoodieTableVersion tableVersion;
+
+ public MetadataWriteHandler(HoodieTableVersion tableVersion) {
+ this.tableVersion = tableVersion;
+ }
+
+ public boolean shouldInitializeFromFilesystem(Set<String>
pendingDataInstants, Option<String> inflightInstantTimestamp) {
+ if (tableVersion.lesserThan(HoodieTableVersion.EIGHT) &&
pendingDataInstants.stream()
+ .anyMatch(i -> !inflightInstantTimestamp.isPresent() ||
!i.equals(inflightInstantTimestamp.get()))) {
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+ LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
+ Arrays.toString(pendingDataInstants.toArray()));
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public String getTimelineHistoryPath() {
+ return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ ? TIMELINE_HISTORY_PATH.defaultValue() :
ARCHIVELOG_FOLDER.defaultValue();
+ }
+
+ /**
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
+ */
+ protected boolean validateCompactionScheduling(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+ return
validateCompactionSchedulingVersionSix(inFlightInstantTimestamp,
latestDeltaCommitTimeInMetadataTable);
+ } else {
+ // Under the log compaction scope, the sequence of the log-compaction
and compaction needs to be ensured because metadata items such as RLI
+ // only has proc-time ordering semantics. For "ensured", it means the
completion sequence of the log-compaction/compaction is the same as the start
sequence.
+ if (metadataWriteConfig.isLogCompactionEnabled()) {
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.warn("Not scheduling compaction or logCompaction, since a
pending compaction instant {} or logCompaction {} instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant);
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
+ */
+ protected boolean validateCompactionSchedulingVersionSix(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below
scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is
committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should
be employed
+ // b. There could be DT inflights after latest delta commit in MDT and
we are ok with it. bcoz, the contract is, the latest compaction instant time in
MDT represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could
complete before c3. and we can't trigger compaction in MDT with c4 as base
instant time, until every
+ // instant before c4 is synced with metadata table.
+ List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+ if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants
in data table before latest deltacommit in metadata table: %s. Inflight
instants in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
+ return false;
+ }
+
+ // Check if there are any pending compaction or log compaction instants
in the timeline.
+ // If pending compact/logCompaction operations are found abort
scheduling new compaction/logCompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.warn(String.format("Not scheduling compaction or logCompaction,
since a pending compaction instant %s or logCompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return false;
+ }
+ return true;
+ }
+
+ protected void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int
maxNumDeltaCommitsWhenPending) {
+ final HoodieActiveTimeline activeTimeline =
metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+ int numDeltaCommits = lastCompaction.isPresent()
+ ?
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+ : activeTimeline.getDeltaCommitTimeline().countInstants();
+ if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+ throw new HoodieMetadataException(String.format("Metadata table's
deltacommits exceeded %d: "
+ + "this is likely caused by a pending instant in the data
table. Resolve the pending instant "
+ + "or adjust `%s`, then restart the pipeline.",
+ maxNumDeltaCommitsWhenPending,
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+ }
+ }
+
+ /**
+ * Perform a compaction on the Metadata Table.
+ * <p>
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ * <p>
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
+ */
+ protected void compactIfNecessary(BaseHoodieWriteClient writeClient,
String latestDeltacommitTime) {
+ if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+ compactIfNecessaryVersionSix(writeClient, latestDeltacommitTime);
+ } else {
+ // IMPORTANT: Trigger compaction with max instant time that is smaller
than(or equals) the earliest pending instant from DT.
+ // The compaction planner will manage to filter out the log files that
finished with greater completion time.
+ // see BaseHoodieCompactionPlanGenerator.generateCompactionPlan for
more details.
+ HoodieTimeline metadataCompletedTimeline =
metadataMetaClient.getActiveTimeline().filterCompletedInstants();
+ final String compactionInstantTime =
dataMetaClient.reloadActiveTimeline()
+ // The filtering strategy is kept in line with the rollback
premise, if an instant is pending on DT but completed on MDT,
+ // generates a compaction time smaller than it so that the instant
could then been rolled back.
+ .filterInflightsAndRequested().filter(instant ->
metadataCompletedTimeline.containsInstant(instant.requestedTime())).firstInstant()
+ // minus the pending instant time by 1 millisecond to avoid
conflicts on the MDT.
+ .map(instant ->
HoodieInstantTimeGenerator.instantTimeMinusMillis(instant.requestedTime(), 1L))
+ .orElse(writeClient.createNewInstantTime(false));
+
+ // we need to avoid checking compaction w/ same instant again.
+ // let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
+ // and again w/ C6, we will re-attempt compaction at which point
latest delta commit is C4 in MDT.
+ // and so we try compaction w/ instant C4001. So, we can avoid
compaction if we already have compaction w/ same instant time.
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with new instant time.
+ final String logCompactionInstantTime =
metadataMetaClient.createNewInstantTime(false);
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
{
+ LOG.info("Log compaction with same {} time is already present in
the timeline.", logCompactionInstantTime);
+ } else if
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime,
Option.empty())) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
logCompactionInstantTime);
+ writeClient.logCompact(logCompactionInstantTime);
+ }
+ }
+ }
+ }
+
+ /**
+ * Perform a compaction on the Metadata Table.
+ * <p>
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ * <p>
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
+ */
+ protected void compactIfNecessaryVersionSix(BaseHoodieWriteClient
writeClient, String latestDeltacommitTime) {
Review Comment:
again, lets revisit the java docs here. can you copy docs from 0.15.0.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1726,4 +1659,302 @@ public boolean isInitialized() {
}
protected abstract BaseHoodieWriteClient<?, I, ?, ?> initializeWriteClient();
+
+ protected class MetadataWriteHandler implements Serializable {
+ HoodieTableVersion tableVersion;
+
+ public MetadataWriteHandler(HoodieTableVersion tableVersion) {
+ this.tableVersion = tableVersion;
+ }
+
+ public boolean shouldInitializeFromFilesystem(Set<String>
pendingDataInstants, Option<String> inflightInstantTimestamp) {
+ if (tableVersion.lesserThan(HoodieTableVersion.EIGHT) &&
pendingDataInstants.stream()
+ .anyMatch(i -> !inflightInstantTimestamp.isPresent() ||
!i.equals(inflightInstantTimestamp.get()))) {
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
+ LOG.warn("Cannot initialize metadata table as operation(s) are in
progress on the dataset: {}",
+ Arrays.toString(pendingDataInstants.toArray()));
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public String getTimelineHistoryPath() {
+ return tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
+ ? TIMELINE_HISTORY_PATH.defaultValue() :
ARCHIVELOG_FOLDER.defaultValue();
+ }
+
+ /**
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
+ */
+ protected boolean validateCompactionScheduling(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ if (tableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+ return
validateCompactionSchedulingVersionSix(inFlightInstantTimestamp,
latestDeltaCommitTimeInMetadataTable);
+ } else {
+ // Under the log compaction scope, the sequence of the log-compaction
and compaction needs to be ensured because metadata items such as RLI
+ // only has proc-time ordering semantics. For "ensured", it means the
completion sequence of the log-compaction/compaction is the same as the start
sequence.
+ if (metadataWriteConfig.isLogCompactionEnabled()) {
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.warn("Not scheduling compaction or logCompaction, since a
pending compaction instant {} or logCompaction {} instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant);
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * Validates the timeline for both main and metadata tables to ensure
compaction on MDT can be scheduled.
+ */
+ protected boolean validateCompactionSchedulingVersionSix(Option<String>
inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) {
+ // we need to find if there are any inflights in data table timeline
before or equal to the latest delta commit in metadata table.
+ // Whenever you want to change this logic, please ensure all below
scenarios are considered.
+ // a. There could be a chance that latest delta commit in MDT is
committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should
be employed
+ // b. There could be DT inflights after latest delta commit in MDT and
we are ok with it. bcoz, the contract is, the latest compaction instant time in
MDT represents
+ // any instants before that is already synced with metadata table.
+ // c. Do consider out of order commits. For eg, c4 from DT could
complete before c3. and we can't trigger compaction in MDT with c4 as base
instant time, until every
+ // instant before c4 is synced with metadata table.
+ List<HoodieInstant> pendingInstants =
dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
.findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants();
+
+ if (!pendingInstants.isEmpty()) {
+ checkNumDeltaCommits(metadataMetaClient,
dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending());
+ LOG.info(String.format(
+ "Cannot compact metadata table as there are %d inflight instants
in data table before latest deltacommit in metadata table: %s. Inflight
instants in data table: %s",
+ pendingInstants.size(), latestDeltaCommitTimeInMetadataTable,
Arrays.toString(pendingInstants.toArray())));
+ return false;
+ }
+
+ // Check if there are any pending compaction or log compaction instants
in the timeline.
+ // If pending compact/logCompaction operations are found abort
scheduling new compaction/logCompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.warn(String.format("Not scheduling compaction or logCompaction,
since a pending compaction instant %s or logCompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return false;
+ }
+ return true;
+ }
+
+ protected void checkNumDeltaCommits(HoodieTableMetaClient metaClient, int
maxNumDeltaCommitsWhenPending) {
+ final HoodieActiveTimeline activeTimeline =
metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> lastCompaction =
activeTimeline.filterCompletedInstants()
+ .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+ int numDeltaCommits = lastCompaction.isPresent()
+ ?
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().requestedTime()).countInstants()
+ : activeTimeline.getDeltaCommitTimeline().countInstants();
+ if (numDeltaCommits > maxNumDeltaCommitsWhenPending) {
+ throw new HoodieMetadataException(String.format("Metadata table's
deltacommits exceeded %d: "
+ + "this is likely caused by a pending instant in the data
table. Resolve the pending instant "
+ + "or adjust `%s`, then restart the pipeline.",
+ maxNumDeltaCommitsWhenPending,
HoodieMetadataConfig.METADATA_MAX_NUM_DELTACOMMITS_WHEN_PENDING.key()));
+ }
+ }
+
+ /**
+ * Perform a compaction on the Metadata Table.
+ * <p>
+ * Cases to be handled:
+ * 1. We cannot perform compaction if there are previous inflight
operations on the dataset. This is because
+ * a compacted metadata base file at time Tx should represent all the
actions on the dataset till time Tx.
+ * <p>
+ * 2. In multi-writer scenario, a parallel operation with a greater
instantTime may have completed creating a
+ * deltacommit.
+ */
+ protected void compactIfNecessary(BaseHoodieWriteClient writeClient,
String latestDeltacommitTime) {
Review Comment:
this java docs need to go after L 1775.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -155,6 +160,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
protected StorageConfiguration<?> storageConf;
protected final transient HoodieEngineContext engineContext;
protected final List<MetadataPartitionType> enabledPartitionTypes;
+ protected final MetadataWriteHandler metadataWriteHandler;
Review Comment:
can you add java docs on the purpose of write handler
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java:
##########
@@ -124,7 +124,9 @@ public Pair<Option<HoodieInstant>, Set<String>>
getIncrementalPartitions(TableSe
.filter(this::filterCommitByTableType).flatMap(instant -> {
try {
String completionTime = instant.getCompletionTime();
- if (completionTime.compareTo(leftBoundary) >= 0 &&
completionTime.compareTo(rightBoundary) < 0) {
+ if (completionTime.compareTo(leftBoundary) >= 0
+ && ((instant.requestedTime().length() <
completionTime.length() && instant.requestedTime().compareTo(rightBoundary) < 0)
Review Comment:
yes, and lets add java docs.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -344,9 +355,13 @@ private boolean isBootstrapNeeded(Option<HoodieInstant>
latestMetadataInstant) {
*
* @param initializationTime - Timestamp to use for the commit
* @param partitionsToInit - List of MDT partitions to initialize
+ * @param inflightInstantTimestamp - Current action instant responsible for
this initialization
*/
- private void initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit) throws IOException {
+ private boolean initializeFromFilesystem(String initializationTime,
List<MetadataPartitionType> partitionsToInit, Option<String>
inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);
+ if
(!metadataWriteHandler.shouldInitializeFromFilesystem(pendingDataInstants,
inflightInstantTimestamp)) {
Review Comment:
for table version 6, if there are pending instants in data table, looks like
we return false here. and so in 288 above, do we log an error msg and return ?
is that expected?
I feel, we can remove that logging statement. anyways within
shouldInitializeFromFilesystem, we do log.warn.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]