nsivabalan commented on code in PR #5958:
URL: https://github.com/apache/hudi/pull/5958#discussion_r956461386
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -163,6 +183,21 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+ public static final ConfigProperty<Long>
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+ .key("hoodie.archive.merge.small.file.limit.bytes")
Review Comment:
these are already in HoodieArchivalConfig right? did you move it here or
added new ones ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -240,8 +245,9 @@ private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestVersions(
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Iterator<FileSlice> fileSliceIterator =
- fileGroup.getAllFileSlices().filter(fs ->
!isFileSliceNeededForPendingCompaction(fs)).iterator();
- if (isFileGroupInPendingCompaction(fileGroup)) {
+ fileGroup.getAllFileSlices().filter(fs ->
!isFileSliceNeededForPendingCompaction(fs)
+ && !isFileSliceNeededForPendingLogCompaction(fs)).iterator();
+ if (isFileGroupInPendingCompaction(fileGroup) ||
isFileGroupInPendingLogCompaction(fileGroup)) {
Review Comment:
same here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public class HoodieCompactionPlanGenerator<T extends HoodieRecordPayload, I,
K, O> extends BaseHoodieCompactionPlanGenerator {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieCompactionPlanGenerator.class);
+
+ public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ /**
+ * Generate a new compaction plan for scheduling.
+ * @return Compaction Plan
+ * @throws java.io.IOException when encountering errors
+ */
+ @Override
+ public HoodieCompactionPlan generateCompactionPlan() throws IOException {
Review Comment:
I assume this is just moved w/o any changes. let me know if you had changed
anything in these code blocks.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
}
}
+ public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>>
recordMap) {
+ Iterator<String> keyIterator = recordMap.keySet().stream().iterator();
Review Comment:
can't we iterate the entries only rather than just keys? why do an
additional look up in L499.
##########
hudi-common/src/test/java/org/apache/hudi/common/table/view/FileSystemViewExpectedState.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.view;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class FileSystemViewExpectedState {
Review Comment:
can't we define static inner class in one of the test class.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java:
##########
@@ -143,6 +148,11 @@ protected Map<HoodieFileGroupId, Pair<String,
CompactionOperation>> createFileId
return fileIdToPendingCompaction;
}
+ protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>>
createFileIdToPendingLogCompactionMap(
+ Map<HoodieFileGroupId, Pair<String, CompactionOperation>>
fileIdToPendingLogCompaction) {
+ return fileIdToPendingLogCompaction;
Review Comment:
not sure whats the use of this method. it takes in an argument and returns
as is.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -266,6 +272,20 @@ public class HoodieCompactionConfig extends HoodieConfig {
.sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing
hoodie_commit_time");
+ public static final ConfigProperty<String> PARTITIONS_FOR_COMPACTION =
ConfigProperty
Review Comment:
agree, lets not have this as top level config. internally just for metadata
table, we can trigger this just for record index partition and avoid other
partitions.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -163,6 +183,21 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "record size estimate compute dynamically based on commit
metadata. "
+ " This is critical in computing the insert parallelism and
bin-packing inserts into small files.");
+ public static final ConfigProperty<Long>
ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+ .key("hoodie.archive.merge.small.file.limit.bytes")
+ .defaultValue(20L * 1024 * 1024)
+ .withDocumentation("This config sets the archive file size limit below
which an archive file becomes a candidate to be selected as such a small
file.");
+
+ public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE =
ConfigProperty
+ .key("hoodie.archive.merge.enable")
+ .defaultValue(false)
+ .withDocumentation("When enable, hoodie will auto merge several small
archive files into larger one. It's"
+ + " useful when storage scheme doesn't support append operation.");
+
+ public static final ConfigProperty<String> LOG_COMPACTION_BLOCKS_THRESHOLD =
ConfigProperty
+ .key("hoodie.log.compaction.blocks.threshold")
+ .defaultValue("5")
+ .withDocumentation("Log compaction can be scheduled if the no. of log
blocks crosses this threshold value.");
Review Comment:
looks like as of now, we only have num_delta_commits based triggering
strategy and we don't have size based right? can we file a tracking jira for
this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -369,7 +370,7 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+ HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
Review Comment:
by minor, we are referring to log compaction is it? If yes, can we
standardize the terminology everyhwere. compacting base files + log files we
can call it as major compaction. stitching multiple smaller logs into a large
log file, we can call it as minor compaction.
I understand, thats not what in general sense major and minor compaction
means. but lets be consistent. for eg, I see apis like
writeConfig.inlineLogCompactionEnabled and inlineCompactionEnabled which is not
very apparent and is not using the same convention as
table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction()
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java:
##########
@@ -118,6 +118,12 @@ public static HoodieArchivedMetaEntry
createMetaWrapper(HoodieInstant hoodieInst
archivedMetaWrapper.setActionType(ActionType.compaction.name());
break;
}
+ case HoodieTimeline.LOG_COMPACTION_ACTION: {
Review Comment:
did we add any test covering archival flows. i.e. after partial log
compaction is complete in the timeline, if the instant is eligible for
archival, does archival succeed? and reading them back also works fine?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionStrategy.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+public class LogCompactionExecutionStrategy<T extends HoodieRecordPayload, I,
K, O> implements Serializable {
Review Comment:
java docs please
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -456,6 +491,23 @@ public List<WriteStatus> close() {
}
}
+ public void write(Map<String, HoodieRecord<? extends HoodieRecordPayload>>
recordMap) {
+ Iterator<String> keyIterator = recordMap.keySet().stream().iterator();
+ try {
+ while (keyIterator.hasNext()) {
+ final String key = keyIterator.next();
+ HoodieRecord<T> record = (HoodieRecord<T>) recordMap.get(key);
+ init(record);
+ // For logCompaction operations all the records are read and written
as a huge block.
Review Comment:
So, we don't do any rollover based on log file size is it. so the target log
file could be very huge as well ? can we fix that. since we have avg record
size and num records, shouldn't be too hard to do a rollover once certain size
is met.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -540,6 +565,10 @@ public void rollbackInflightCompaction(HoodieInstant
inflightInstant) {
rollbackInflightCompaction(inflightInstant, s -> Option.empty());
}
+ public void rollbackInflightLogCompaction(HoodieInstant inflightInstant) {
+ rollbackInflightLogCompaction(inflightInstant, s -> Option.empty());
Review Comment:
shouldn't we pass getPendingRollbackInstantFunc in the 2nd argument.
the reason why we introduced the pending rollback is below.
Lets say c5 commit partially failed.
when we are looking to start a new commit, say C7, we deduct that c5 has
failed and we trigger a rollback named RB_5.
But lets say the process crashed mid-way for RB_5.
Next time we restart the pipeline for say C8. again we deduct that c5 is
partially failed and we want to rollback. but we don't want to create RB_9 for
this,but re-use RB_5 again and get it to completion.
So, rollbackPendingCompaction or rollbackPendingClustering should always try
to re-use rollback instants if any.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -379,11 +407,16 @@ public void doAppend() {
flushToDiskIfRequired(record);
writeToBuffer(record);
}
- appendDataAndDeleteBlocks(header);
+ appendDataAndDeleteBlocks(header, true);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
}
- protected void appendDataAndDeleteBlocks(Map<HeaderMetadataType, String>
header) {
+ /**
+ * Appends data and delete blocks. When appendDeleteBlocks value is false,
only data blocks are appended.
+ * This is done so that all the data blocks are created first and then a
single delete block is added.
+ * Otherwise what can end up happening is creation of multiple small delete
blocks get added after each data block.
Review Comment:
won't this result in wrong data being served?
Lets say we have
Data_block_1
Delete_block_2
Data_block_3
where in, data_block_3 has some records inserted again which was deleted in
delete_block_2.
w/o any minor compaction, when we do snapshot read, we should be able to
read the deleted records which are re-inserted again(since data_block_3 wins).
But according to this new logic, won't we compact like
Data_block_1, Data_block_3, Delete_block_2.
On which case, the deletion will win over-riding the data_block_3.
let me know if I am missing something
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactHelpers.java:
##########
@@ -83,4 +83,16 @@ public void completeInflightCompaction(HoodieTable table,
String compactionCommi
"Failed to commit " + table.getMetaClient().getBasePath() + " at
time " + compactionCommitTime, e);
}
}
+
+ public void completeInflightLogCompaction(HoodieTable table, String
logCompactionCommitTime, HoodieCommitMetadata commitMetadata) {
Review Comment:
looks like almost duplicate of completeInflightLogCompaction. can we try to
re-use code as much as possible. you can create a private method and call from
two diff methods (for compaction and log compaction). please check other places
too.there are opportunities for code reuse.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunLogCompactionActionExecutor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public class RunLogCompactionActionExecutor<T extends HoodieRecordPayload>
extends
+ BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>>
implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(RunLogCompactionActionExecutor.class);
+ private final HoodieCompactionHandler compactionHandler;
+
+ public RunLogCompactionActionExecutor(HoodieEngineContext engineContext,
Review Comment:
Again, lets see if we can re-use code. we can extend from existing
RunCompactionActionExecutor class and add protected methods, etc. but lets see
how we can avoid duplicated code in general.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -240,8 +245,9 @@ private Pair<Boolean, List<CleanFileInfo>>
getFilesToCleanKeepingLatestVersions(
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
Iterator<FileSlice> fileSliceIterator =
- fileGroup.getAllFileSlices().filter(fs ->
!isFileSliceNeededForPendingCompaction(fs)).iterator();
- if (isFileGroupInPendingCompaction(fileGroup)) {
+ fileGroup.getAllFileSlices().filter(fs ->
!isFileSliceNeededForPendingCompaction(fs)
Review Comment:
can we introduce one api.
isFileSliceNeededForPendingMajorAndMinorCompaction ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionStrategy.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+public class LogCompactionExecutionStrategy<T extends HoodieRecordPayload, I,
K, O> implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(LogCompactionExecutionStrategy.class);
+
+ private final HoodieTable<T, I, K, O> hoodieTable;
+ private final transient HoodieEngineContext engineContext;
+ private final HoodieWriteConfig writeConfig;
+ private final HoodieCompactionHandler compactionHandler;
+
+ public LogCompactionExecutionStrategy(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig writeConfig,
+ HoodieCompactionHandler
compactionHandler) {
+ this.writeConfig = writeConfig;
+ this.hoodieTable = table;
+ this.engineContext = engineContext;
+ this.compactionHandler = compactionHandler;
+ }
+
+ /**
+ * Executes compaction based on the stategy class provided within the
compaction Plan.
+ * Note that commit is not done as part of strategy. commit is callers
responsibility.
+ */
+ public HoodieData<WriteStatus> performCompaction(final HoodieCompactionPlan
compactionPlan, final Schema schema,
+ final String instantTime) {
+
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ // Here we use the table schema as the reader schema to read
+ // log file.That is because in the case of MergeInto, the config.getSchema
may not
+ // the same with the table schema.
+ HoodieWriteConfig writeConfigCopy =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ try {
+ Schema readerSchema = schemaResolver.getTableAvroSchema(false);
+ writeConfigCopy.setSchema(readerSchema.toString());
+ } catch (Exception e) {
+ // If there is no commit in the table, just ignore the exception.
+ }
+
+ // Compacting is very similar to applying updates to existing file
+ List<CompactionOperation> compactionOps =
compactionPlan.getOperations().stream()
Review Comment:
I see some commonality between this and HoodieCompactor.compact(). Is there
a chance of re-using code. would like to avoid duplicating if possible.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java:
##########
@@ -26,5 +26,8 @@ public enum CompactionTriggerStrategy {
// trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied
NUM_AND_TIME,
// trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied
- NUM_OR_TIME
+ NUM_OR_TIME,
+ // Always triggers. This is way to port the condition check from
ScheduleCompactionActionExecutor
+ // towards the plan generators. Ideally done when there are complex
condition checks.
+ ALWAYS_ALLOW
Review Comment:
may I know when is this useful.
wrt newly added log compaction, we try to follow similar semantics as
regular compaction right? i.e. if min.delta.commits is set to 5, once every 5
delta commits, we trigger compaction.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionStrategy.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.io.IOUtils;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.toList;
+
+public class LogCompactionExecutionStrategy<T extends HoodieRecordPayload, I,
K, O> implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(LogCompactionExecutionStrategy.class);
+
+ private final HoodieTable<T, I, K, O> hoodieTable;
+ private final transient HoodieEngineContext engineContext;
+ private final HoodieWriteConfig writeConfig;
+ private final HoodieCompactionHandler compactionHandler;
+
+ public LogCompactionExecutionStrategy(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig writeConfig,
+ HoodieCompactionHandler
compactionHandler) {
+ this.writeConfig = writeConfig;
+ this.hoodieTable = table;
+ this.engineContext = engineContext;
+ this.compactionHandler = compactionHandler;
+ }
+
+ /**
+ * Executes compaction based on the stategy class provided within the
compaction Plan.
+ * Note that commit is not done as part of strategy. commit is callers
responsibility.
+ */
+ public HoodieData<WriteStatus> performCompaction(final HoodieCompactionPlan
compactionPlan, final Schema schema,
+ final String instantTime) {
+
+ HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ // Here we use the table schema as the reader schema to read
+ // log file.That is because in the case of MergeInto, the config.getSchema
may not
+ // the same with the table schema.
+ HoodieWriteConfig writeConfigCopy =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
+ try {
+ Schema readerSchema = schemaResolver.getTableAvroSchema(false);
+ writeConfigCopy.setSchema(readerSchema.toString());
+ } catch (Exception e) {
+ // If there is no commit in the table, just ignore the exception.
+ }
+
+ // Compacting is very similar to applying updates to existing file
+ List<CompactionOperation> compactionOps =
compactionPlan.getOperations().stream()
+
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
+ LOG.info("Compactions operations " + compactionOps);
+
+ engineContext.setJobStatus(this.getClass().getSimpleName(), "Log
Compacting log files");
+ TaskContextSupplier taskContextSupplier =
hoodieTable.getTaskContextSupplier();
+ return engineContext.parallelize(compactionOps).map(compactionOperation ->
+ logCompact(compactionHandler, metaClient, writeConfigCopy,
compactionOperation, instantTime, taskContextSupplier))
+ .flatMap(List::iterator);
+ }
+
+ public List<WriteStatus> logCompact(HoodieCompactionHandler
compactionHandler,
Review Comment:
again, this also seems mostly duplicated. would you mind pointing to the
exact differences between the duplicated code. I can review those code blocks
in detail
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunLogCompactionActionExecutor.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCompactionException;
+import org.apache.hudi.table.HoodieCompactionHandler;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+public class RunLogCompactionActionExecutor<T extends HoodieRecordPayload>
extends
+ BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>>
implements Serializable {
+
+ private static final Logger LOG =
LogManager.getLogger(RunLogCompactionActionExecutor.class);
+ private final HoodieCompactionHandler compactionHandler;
+
+ public RunLogCompactionActionExecutor(HoodieEngineContext engineContext,
+ HoodieWriteConfig writeConfig,
+ HoodieTable table,
+ String instantTime,
+ HoodieCompactionHandler
compactionHandler) {
+ super(engineContext, writeConfig, table, instantTime);
+ this.compactionHandler = compactionHandler;
+ }
+
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ try {
+ HoodieInstant requestedInstant =
HoodieTimeline.getLogCompactionRequestedInstant(instantTime);
+ // Mark instant as log compaction inflight
+
table.getActiveTimeline().transitionLogCompactionRequestedToInflight(requestedInstant);
+ table.getMetaClient().reloadActiveTimeline();
+
+ // Get log compaction action plan
+ HoodieCompactionPlan compactionPlan =
+ CompactionUtils.getLogCompactionPlan(table.getMetaClient(),
instantTime);
+
+ if (compactionPlan == null) {
+ throw new HoodieCompactionException("Log compaction action cannot be
performed since there is log compaction plan.");
+ }
+
+ // Get the log compaction action executor.
+ HoodieCompactionStrategy compactionStrategy =
compactionPlan.getStrategy();
+ if (compactionStrategy == null ||
StringUtils.isNullOrEmpty(compactionStrategy.getCompactorClassName())) {
+ throw new HoodieCompactionException("Log compaction action cannot be
performed since there is no strategy class.");
+ }
+
+ final Schema schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
+
+ HoodieData<WriteStatus> writeStatus =
+ ((LogCompactionExecutionStrategy<T, HoodieData<HoodieRecord<T>>,
HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
+
ReflectionUtils.loadClass(compactionStrategy.getCompactorClassName(), new
Class<?>[]
Review Comment:
RunCompactionActionExecutor has some code which handles schema evolution.
```
// try to load internalSchema to support schema Evolution
HoodieWriteConfig configCopy = config;
Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(),
instantTime);
if (schemaPair.getLeft().isPresent() &&
schemaPair.getRight().isPresent()) {
// should not influence the original config, just copy it
configCopy =
HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build();
configCopy.setInternalSchemaString(schemaPair.getLeft().get());
configCopy.setSchema(schemaPair.getRight().get());
}
```
I don't see that being handled here. can you check on that please.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.LogCompactionExecutionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload,
I, K, O> extends BaseHoodieCompactionPlanGenerator {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieLogCompactionPlanGenerator.class);
+
+ public HoodieLogCompactionPlanGenerator(HoodieTable table,
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ /**
+ * Generate a new log compaction plan for scheduling.
+ * @return Log Compaction Plan
+ * @throws java.io.IOException when encountering errors
+ */
+ @Override
+ public HoodieCompactionPlan generateCompactionPlan() {
+
+ // While scheduling log compaction (i.e. minor compaction) make sure only
one log compaction is scheduled for a latest file Slice.
+ // Major compaction anyway will take care of creating a new base file, so
if there is a pending compaction then log compaction
+ // need not be scheduled for previous file slice.
+ // Therefore, log compaction will only be scheduled for latest file slice
or always for last file slice.
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
this.hoodieTable.getSliceView();
+
+ // Accumulator to keep track of total log files for a table
+ HoodieAccumulator totalLogFiles = this.engineContext.newAccumulator();
+ // Accumulator to keep track of total log file slices for a table
+ HoodieAccumulator totalFileSlices = this.engineContext.newAccumulator();
+
+ HoodieTableMetaClient metaClient = this.hoodieTable.getMetaClient();
+
+ // Filter partition paths.
+ List<String> partitionPaths =
FSUtils.getAllPartitionPaths(this.engineContext,
writeConfig.getMetadataConfig(),
+ metaClient.getBasePath());
+
+ // Compaction Strategy should be SpecificPartitionCompactionStrategy to
run a logcompaction on a specified partition.
+ partitionPaths =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitionPaths);
+
+ // Collect all pending compaction file groups
+ Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+
+ // Collect all pending log compaction file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet()));
+
+ // Collect all pending clustering file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering()
+ .map(Pair::getLeft).collect(Collectors.toSet()));
+
+ String maxInstantTime = hoodieTable.getMetaClient()
+
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+ // Here two different filters are applied before scheduling log compaction.
+ // 1. Exclude all the file groups which are either part of a pending
compaction or clustering plans.
Review Comment:
minor. exclude all file groups part of pending compaction, pending minor log
compaction and pending clustering plans
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.LogCompactionExecutionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload,
I, K, O> extends BaseHoodieCompactionPlanGenerator {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieLogCompactionPlanGenerator.class);
+
+ public HoodieLogCompactionPlanGenerator(HoodieTable table,
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ /**
+ * Generate a new log compaction plan for scheduling.
+ * @return Log Compaction Plan
+ * @throws java.io.IOException when encountering errors
+ */
+ @Override
+ public HoodieCompactionPlan generateCompactionPlan() {
+
+ // While scheduling log compaction (i.e. minor compaction) make sure only
one log compaction is scheduled for a latest file Slice.
+ // Major compaction anyway will take care of creating a new base file, so
if there is a pending compaction then log compaction
+ // need not be scheduled for previous file slice.
+ // Therefore, log compaction will only be scheduled for latest file slice
or always for last file slice.
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
this.hoodieTable.getSliceView();
+
+ // Accumulator to keep track of total log files for a table
+ HoodieAccumulator totalLogFiles = this.engineContext.newAccumulator();
+ // Accumulator to keep track of total log file slices for a table
+ HoodieAccumulator totalFileSlices = this.engineContext.newAccumulator();
+
+ HoodieTableMetaClient metaClient = this.hoodieTable.getMetaClient();
+
+ // Filter partition paths.
+ List<String> partitionPaths =
FSUtils.getAllPartitionPaths(this.engineContext,
writeConfig.getMetadataConfig(),
+ metaClient.getBasePath());
+
+ // Compaction Strategy should be SpecificPartitionCompactionStrategy to
run a logcompaction on a specified partition.
+ partitionPaths =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitionPaths);
+
+ // Collect all pending compaction file groups
+ Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+
+ // Collect all pending log compaction file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet()));
+
+ // Collect all pending clustering file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering()
+ .map(Pair::getLeft).collect(Collectors.toSet()));
+
+ String maxInstantTime = hoodieTable.getMetaClient()
+
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+ // Here two different filters are applied before scheduling log compaction.
+ // 1. Exclude all the file groups which are either part of a pending
compaction or clustering plans.
+ // 2. Check if FileSlices are meeting the criteria for LogCompaction.
+ List<HoodieCompactionOperation> operations =
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
+ .getLatestFileSlices(partitionPath)
+ .filter(fileSlice ->
!fgIdsInPendingCompactionAndClustering.contains(fileSlice.getFileGroupId()))
Review Comment:
Are we fixing the scheduling of regular compaction.
i.e. if minor log compaction is scheduled for a file group, we will avoid
scheduling regular compaction right?
here I see that vice versa holds good. i.e. if regular compaction is
scheduled, we will never schedule log compaction which is good.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -147,6 +152,21 @@ public
HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEng
return new
SparkBootstrapDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context,
config, this, extraMetadata).execute();
}
+ @Override
+ public Option<HoodieCompactionPlan>
scheduleLogCompaction(HoodieEngineContext context, String instantTime,
Option<Map<String, String>> extraMetadata) {
+ ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new
ScheduleCompactionActionExecutor(
Review Comment:
we can fold L157, 158 into single line
##########
hudi-common/src/main/avro/HoodieCompactionOperation.avsc:
##########
@@ -84,6 +84,18 @@
"name":"version",
"type":["int", "null"],
"default": 1
+ },
+ {
+ "name":"strategy",
+ "type":[
+ "null", "HoodieCompactionStrategy"
+ ],
+ "default": null
+ },
+ {
+ "name":"preserveHoodieMetadata",
+ "type":["boolean", "null"],
+ "default": false
Review Comment:
can we have null as first entry in the union.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -379,14 +435,29 @@ static HoodieInstant getIndexInflightInstant(final String
timestamp) {
/**
* Returns the inflight instant corresponding to the instant being passed.
Takes care of changes in action names
- * between inflight and completed instants (compaction <=> commit).
+ * between inflight and completed instants (compaction <=> commit) and
(logcompaction <==> deltacommit).
* @param instant Hoodie Instant
- * @param tableType Hoodie Table Type
+ * @param metaClient Hoodie metaClient to fetch tableType and fileSystem.
* @return Inflight Hoodie Instant
*/
- static HoodieInstant getInflightInstant(final HoodieInstant instant, final
HoodieTableType tableType) {
- if ((tableType == HoodieTableType.MERGE_ON_READ) &&
instant.getAction().equals(COMMIT_ACTION)) {
- return new HoodieInstant(true, COMPACTION_ACTION,
instant.getTimestamp());
+ static HoodieInstant getInflightInstant(final HoodieInstant instant, final
HoodieTableMetaClient metaClient) {
+ if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+ if (instant.getAction().equals(COMMIT_ACTION)) {
+ return new HoodieInstant(true, COMPACTION_ACTION,
instant.getTimestamp());
+ } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+ // Deltacommit is used by both ingestion and logcompaction.
+ // So, distinguish both of them check for the inflight file being
present.
+ FileSystem fs = metaClient.getFs();
+ String logCompactionRequestedFile = instant.getTimestamp() +
"logcompaction.requested";
+ Path path = new Path(metaClient.getMetaPath(),
logCompactionRequestedFile);
+ try {
+ if (fs.exists(path)) {
Review Comment:
we have put in lot of effort to avoid direct fs calls in general in data
table. so, lets see if we can avoid fs.exists() here. if metaclient will have
all timeline instances, can we check for specific instants may be ?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -151,6 +173,19 @@ public static HoodieCompactionPlan
getCompactionPlan(HoodieTableMetaClient metaC
return migrator.upgradeToLatest(compactionPlan,
compactionPlan.getVersion());
}
+ /**
+ * This method will serve only log compaction instants,
+ * because we use same HoodieCompactionPlan for both the operations.
+ */
+ public static HoodieCompactionPlan
getLogCompactionPlan(HoodieTableMetaClient metaClient,
+ String
logCompactionInstant) throws IOException {
+ HoodieInstant compactionRequestedInstant =
HoodieTimeline.getLogCompactionRequestedInstant(logCompactionInstant);
+ CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
Review Comment:
there won't be multiple versions incase of log compaction plans right. do we
still need this migrator ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.plan.generators;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.LogCompactionExecutionStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload,
I, K, O> extends BaseHoodieCompactionPlanGenerator {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieLogCompactionPlanGenerator.class);
+
+ public HoodieLogCompactionPlanGenerator(HoodieTable table,
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ /**
+ * Generate a new log compaction plan for scheduling.
+ * @return Log Compaction Plan
+ * @throws java.io.IOException when encountering errors
+ */
+ @Override
+ public HoodieCompactionPlan generateCompactionPlan() {
+
+ // While scheduling log compaction (i.e. minor compaction) make sure only
one log compaction is scheduled for a latest file Slice.
+ // Major compaction anyway will take care of creating a new base file, so
if there is a pending compaction then log compaction
+ // need not be scheduled for previous file slice.
+ // Therefore, log compaction will only be scheduled for latest file slice
or always for last file slice.
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
this.hoodieTable.getSliceView();
+
+ // Accumulator to keep track of total log files for a table
+ HoodieAccumulator totalLogFiles = this.engineContext.newAccumulator();
+ // Accumulator to keep track of total log file slices for a table
+ HoodieAccumulator totalFileSlices = this.engineContext.newAccumulator();
+
+ HoodieTableMetaClient metaClient = this.hoodieTable.getMetaClient();
+
+ // Filter partition paths.
+ List<String> partitionPaths =
FSUtils.getAllPartitionPaths(this.engineContext,
writeConfig.getMetadataConfig(),
+ metaClient.getBasePath());
+
+ // Compaction Strategy should be SpecificPartitionCompactionStrategy to
run a logcompaction on a specified partition.
+ partitionPaths =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitionPaths);
+
+ // Collect all pending compaction file groups
+ Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+
+ // Collect all pending log compaction file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet()));
+
+ // Collect all pending clustering file groups
+
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering()
+ .map(Pair::getLeft).collect(Collectors.toSet()));
+
+ String maxInstantTime = hoodieTable.getMetaClient()
+
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
+ .filterCompletedInstants().lastInstant().get().getTimestamp();
+
+ // Here two different filters are applied before scheduling log compaction.
+ // 1. Exclude all the file groups which are either part of a pending
compaction or clustering plans.
+ // 2. Check if FileSlices are meeting the criteria for LogCompaction.
+ List<HoodieCompactionOperation> operations =
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
+ .getLatestFileSlices(partitionPath)
+ .filter(fileSlice ->
!fgIdsInPendingCompactionAndClustering.contains(fileSlice.getFileGroupId()))
+ .filter(fileSlice -> isFileSliceEligibleForLogCompaction(fileSlice,
maxInstantTime))
+ .map(fileSlice -> {
+ List<HoodieLogFile> logFiles =
+
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
+ totalLogFiles.add(logFiles.size());
+ totalFileSlices.add(1);
+ Option<HoodieBaseFile> dataFile = fileSlice.getBaseFile();
+ return new CompactionOperation(dataFile,
fileSlice.getPartitionPath(), logFiles,
+ writeConfig.getCompactionStrategy().captureMetrics(writeConfig,
fileSlice));
+ })
+ .filter(c -> !c.getDeltaFileNames().isEmpty()),
partitionPaths.size()).stream()
+
.map(CompactionUtils::buildHoodieCompactionOperation).collect(Collectors.toList());
+
+ if (operations.isEmpty()) {
+ LOG.warn("After filtering, Nothing to log compact for " +
metaClient.getBasePath());
+ return null;
+ }
+
+ LOG.info("Total of " + operations.size() + " log compaction operations are
retrieved");
+ LOG.info("Total number of latest file slices " + totalFileSlices.value());
+ LOG.info("Total number of log files " + totalLogFiles.value());
+
+ HoodieCompactionStrategy compactionStrategy =
HoodieCompactionStrategy.newBuilder()
Review Comment:
same comment. code de-duplication.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -147,6 +152,21 @@ public
HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEng
return new
SparkBootstrapDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context,
config, this, extraMetadata).execute();
}
+ @Override
+ public Option<HoodieCompactionPlan>
scheduleLogCompaction(HoodieEngineContext context, String instantTime,
Option<Map<String, String>> extraMetadata) {
+ ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new
ScheduleCompactionActionExecutor(
+ context, config, this, instantTime, extraMetadata,
WriteOperationType.LOG_COMPACT);
+ return scheduleLogCompactionExecutor.execute();
+ }
+
+ @Override
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> logCompact(
+ HoodieEngineContext context, String logCompactionInstantTime) {
+ RunLogCompactionActionExecutor logCompactionExecutor = new
RunLogCompactionActionExecutor(
Review Comment:
same here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -369,7 +370,7 @@ public void bootstrap(Option<Map<String, String>>
extraMetadata) {
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
- HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+ HoodieTimeline inflightTimeline =
table.getMetaClient().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction();
Review Comment:
another option to consider for naming.
FullFileSliceCompaction, partialFileSliceCompaction.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java:
##########
@@ -314,6 +334,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("When enable, hoodie will auto merge several small
archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");
+ public static final ConfigProperty<String> LOG_COMPACTION_BLOCKS_THRESHOLD =
ConfigProperty
+ .key("hoodie.log.compaction.blocks.threshold")
+ .defaultValue("5")
Review Comment:
https://issues.apache.org/jira/browse/HUDI-3580
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -67,20 +67,58 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
+ lostPendingCompactions);
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
+
+ // Rather than checking for compaction instants with completed check and
a commit file,
Review Comment:
trying to see if this is a mandatory fix. can we leave it as is. trying to
not make any changes to core flow.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java:
##########
@@ -161,6 +181,17 @@ public Option<HoodieRollbackPlan>
scheduleRollback(HoodieEngineContext context,
shouldRollbackUsingMarkers).execute();
}
+ @Override
+ public Iterator<List<WriteStatus>> handlePreppedInserts(String instantTime,
String partitionPath, String fileId,
Review Comment:
somehow the naming does not sit well. we already have insert, upsert etc in
HoodieTable. just adding preppedInserts and for the purpose of log compaction
does not look seamless.
can we rename to something else to tie this to just log compaction.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/SpecificPartitionCompactionStrategy.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SpecificPartitionCompactionStrategy extends CompactionStrategy {
Review Comment:
how about `PartitionBasedCompactionStrategy`
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/DataValidationCheckForLogCompactionActions.java:
##########
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.functional;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
+import org.apache.hudi.testutils.GenericRecordValidationTestUtils;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
+import org.apache.hudi.testutils.MetadataMergeWriteStatus;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.model.HoodieRecord.COMMIT_SEQNO_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static
org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertGenericRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DataValidationCheckForLogCompactionActions extends
HoodieClientTestBase {
Review Comment:
Typically we prefix the test class name with "Test".
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -1146,7 +1202,7 @@ public HoodieTimeline getTimeline() {
@Override
public void sync() {
HoodieTimeline oldTimeline = getTimeline();
- HoodieTimeline newTimeline =
metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
+ HoodieTimeline newTimeline =
metaClient.reloadActiveTimeline().filterCompletedInstantsOrRewriteTimeline();
Review Comment:
not sure why we have renamed this. also, curious as to why we have added
clustering to the filtering list. prior to this patch, this method was
filtering for any completed instant or if its compaction. Now, w/ this patch, I
would expect, any completed instant or if its compaction or log compaction.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java:
##########
@@ -183,6 +218,40 @@ public static Map<HoodieFileGroupId, Pair<String,
HoodieCompactionOperation>> ge
return fgIdToPendingCompactionWithInstantMap;
}
+ /**
+ * Get all partition + file Ids with pending Log Compaction operations and
their target log compaction instant time.
+ */
+ public static Map<HoodieFileGroupId, Pair<String,
HoodieCompactionOperation>> getAllPendingLogCompactionOperations(
Review Comment:
lets see if we can re-use code.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java:
##########
@@ -36,7 +36,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
* Hoodie command block type enum.
*/
public enum HoodieCommandBlockTypeEnum {
- ROLLBACK_PREVIOUS_BLOCK
+ ROLLBACK_BLOCK
Review Comment:
looks like we use the ordinal and not the name. we should be good.
@suryaprasanna : can you also confirm this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]