suryaprasanna commented on code in PR #8900:
URL: https://github.com/apache/hudi/pull/8900#discussion_r1223923099
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
latestDeltacommitTime) {
+
+ // Check if there are any pending compaction or log compaction instants in
the timeline.
+ // If pending compact/logcompaction operations are found abort scheduling
new compaction/logcompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
Review Comment:
We can do async execution for compaction and logcompaction on metadata. In
which case Ingestion writers will create the plan and exit and the async
compaction and logcompaction pipelines will execute. In these cases plan's
execution will be delayed.
Will be porting async changes in another PR.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java:
##########
@@ -97,6 +98,11 @@ public interface HoodieTableMetadataWriter extends
Serializable, AutoCloseable {
*/
BaseHoodieWriteClient getWriteClient();
+ /**
+ * It returns write client for metadata table.
+ */
+ HoodieTableMetaClient getMetadataMetaClient();
Review Comment:
This is useful for async table services on the metadata table to check for
inflight compaction and log compaction plans.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java:
##########
@@ -47,6 +50,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O>
create(
String fileId,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ LOG.info("Get updateHandle for fileId " + fileId + " and partitionPath " +
partitionPath + " at commit " + instantTime);
Review Comment:
Done.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1325,6 +1332,39 @@ public static Set<String>
getInflightAndCompletedMetadataPartitions(HoodieTableC
return inflightAndCompletedPartitions;
}
+ public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient
dataMetaClient,
+ HoodieTableMetaClient
metadataMetaClient) {
+ // Only those log files which have a corresponding completed instant on
the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
+ Set<String> validInstantTimestamps =
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+
+ // We should also add completed indexing delta commits in the metadata
table, as they do not
+ // have corresponding completed instant in the data table
+ validInstantTimestamps.addAll(
+ metadataMetaClient.getActiveTimeline()
+ .filter(instant -> instant.isCompleted()
+ && (isIndexingCommit(instant.getTimestamp()) ||
isLogCompactionInstant(instant)))
+ .getInstantsAsStream()
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toList()));
+
+ // For any rollbacks and restores, we cannot neglect the instants that
they are rolling back.
+ // The rollback instant should be more recent than the start of the
timeline for it to have rolled back any
+ // instant which we have a log block for.
+ final String earliestInstantTime = validInstantTimestamps.isEmpty() ?
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
+
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
+ .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN, earliestInstantTime))
+ .forEach(instant -> {
+ validInstantTimestamps.addAll(getRollbackedCommits(instant,
datasetTimeline));
+ });
+
+ // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid
timestamp
+ validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
Review Comment:
Yeah, updated it.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleFactory.java:
##########
@@ -47,6 +50,7 @@ public static <T, I, K, O> HoodieMergeHandle<T, I, K, O>
create(
String fileId,
TaskContextSupplier taskContextSupplier,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ LOG.info("Get updateHandle for fileId " + fileId + " and partitionPath " +
partitionPath + " at commit " + instantTime);
Review Comment:
There could be cases where Record index returns file groups which are
actually not present in the file system, during that case no Option is present
errors will be thrown. Using this log debugging those cases will be easier.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
latestDeltacommitTime) {
+
+ // Check if there are any pending compaction or log compaction instants in
the timeline.
+ // If pending compact/logcompaction operations are found abort scheduling
new compaction/logcompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
+ LOG.info(String.format("Not scheduling compaction or logcompaction,
since a pending compaction instant %s or logcompaction %s instant is present",
+ pendingCompactionInstant, pendingLogCompactionInstant));
+ return;
+ }
+
// Trigger compaction with suffixes based on the same instant time. This
ensures that any future
// delta commits synced over will not have an instant time lesser than the
last completed instant on the
// metadata table.
final String compactionInstantTime =
HoodieTableMetadataUtil.createCompactionTimestamp(latestDeltacommitTime);
+
// we need to avoid checking compaction w/ same instant again.
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)
- && writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
+ LOG.info(String.format("Compaction with same %s time is already present
in the timeline.", compactionInstantTime));
+ return;
+ } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp " +
compactionInstantTime);
writeClient.compact(compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with suffixes based on the same
instant time. This ensures that any future
+ // delta commits synced over will not have an instant time lesser than
the last completed instant on the
+ // metadata table.
+ final String logCompactionInstantTime =
HoodieTableMetadataUtil.createLogCompactionTimestamp(latestDeltacommitTime);
+ if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
{
+ LOG.info(String.format("Log compaction with same %s time is already
present in the timeline.", logCompactionInstantTime));
+ return;
Review Comment:
Yeah, removed the unnecessary returns in this class.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java:
##########
@@ -233,48 +233,49 @@ void testSyncMetadataTable() throws Exception {
assertThat(completedTimeline.lastInstant().get().getTimestamp(),
startsWith(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
// test metadata table compaction
- // write another 4 commits
- for (int i = 1; i < 5; i++) {
+ // write another 9 commits to trigger compaction twice. Since default
clean version to retain is 2.
Review Comment:
Clean's retainFileVersions value is changed to 2 and it is actually
hardcoded in HoodieMetadataWriteUtils and both Spark and Flink gets the
writeConfig using this. So, this cannot be overridden.
The reason for making the change is to support restore of main table
timeline for atleast 10 commits. Restore on main table also causes restore on
metadata table. If the retainFileVersions is less than 2, then next cleaner job
after compaction would remove previous file slice there by blocking restore on
metadata table or loosing data.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1021,17 +1023,46 @@ private void
runPendingTableServicesOperations(BaseHoodieWriteClient writeClient
* deltacommit.
*/
protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String
latestDeltacommitTime) {
+
+ // Check if there are any pending compaction or log compaction instants in
the timeline.
+ // If pending compact/logcompaction operations are found abort scheduling
new compaction/logcompaction operations.
+ Option<HoodieInstant> pendingLogCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingLogCompactionTimeline().firstInstant();
+ Option<HoodieInstant> pendingCompactionInstant =
+
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
+ if (pendingLogCompactionInstant.isPresent() ||
pendingCompactionInstant.isPresent()) {
Review Comment:
Good point. Refactored code.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -985,9 +985,10 @@ public void performTableServices(Option<String>
inFlightInstantTimestamp) {
// Do timeline validation before scheduling compaction/logcompaction
operations.
if (validateTimelineBeforeSchedulingCompaction(inFlightInstantTimestamp,
latestDeltacommitTime)) {
+ LOG.info("No inflight commits present in either of the timelines. "
+ + "Proceeding to check compaction.");
Review Comment:
Yeah good point. I added it to check if
validateTimelineBeforeSchedulingCompaction succeeded. But with existing code it
can be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]