vinothchandar commented on a change in pull request #2048:
URL: https://github.com/apache/hudi/pull/2048#discussion_r487541865
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -95,6 +93,13 @@ public HoodieWriteMetadata execute(JavaRDD<HoodieRecord<T>>
inputRecordsRDD) {
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
+ JavaRDD<WriteStatus> writeStatusRDD = processInputRecords(inputRecordsRDD,
profile);
+ HoodieWriteMetadata result = new HoodieWriteMetadata();
+ updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+ return result;
+ }
+
+ protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>>
inputRecordsRDD, WorkloadProfile profile) {
Review comment:
rename: writeInputRecords()
##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata
insertPrepped(JavaSparkContext jsc, String i
public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc,
String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner>
bulkInsertPartitioner);
+ /**
+ * Logically delete all existing records and Insert a batch of new records
into Hoodie table at the supplied instantTime.
+ */
+ public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc,
String instantTime,
Review comment:
I think at the HoodieTable level, the API has to be about replacing file
groups and not insertOverwrite (which can be limited to the WriteClient level).
This way clustering can also use the same method, to build on top.
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -378,9 +398,9 @@ public static void createCompactionAuxiliaryMetadata(String
basePath, HoodieInst
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME +
"/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
try (FSDataOutputStream os = fs.create(commitFile, true)) {
- HoodieCompactionPlan workload = new HoodieCompactionPlan();
+ HoodieCompactionPlan workload =
HoodieCompactionPlan.newBuilder().setVersion(1).build();
Review comment:
why is this change necessayr?
##########
File path:
hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
##########
@@ -102,7 +104,9 @@ public void commit(WriterCommitMessage[] messages) {
.flatMap(m -> m.getWriteStatuses().stream().map(m2 ->
m2.getStat())).collect(Collectors.toList());
try {
- writeClient.commitStats(instantTime, writeStatList, Option.empty());
+ writeClient.commitStats(instantTime, writeStatList, Option.empty(),
Review comment:
better approach for these situations generally is to introduce a
`commitStats(.)` that does not take the last argument and deal with it
internally inside WriteClient.
This way the code will remain more readable, without having to reference
replace stats in bulk_insert, which have nothing to do with each other. hope
that makes sense
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc,
HoodieIndex index, Hoo
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses) {
- return commit(instantTime, writeStatuses, Option.empty());
+ HoodieTableMetaClient metaClient = createMetaClient(false);
+ String actionType = metaClient.getCommitActionType();
+ return commit(instantTime, writeStatuses, Option.empty(), actionType);
+ }
+
+ /**
+ * Complete changes performed at the given instantTime marker with specified
action.
+ */
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, String commitActionType) {
+ return commit(instantTime, writeStatuses, Option.empty(),
commitActionType);
}
/**
+ *
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
- Option<Map<String, String>> extraMetadata) {
- List<HoodieWriteStat> stats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, stats, extraMetadata);
+ Option<Map<String, String>> extraMetadata) {
+ HoodieTableMetaClient metaClient = createMetaClient(false);
+ String actionType = metaClient.getCommitActionType();
+ return commit(instantTime, writeStatuses, extraMetadata, actionType);
}
- public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata) {
- LOG.info("Committing " + instantTime);
+ /**
+ * Complete changes performed at the given instantTime marker with specified
action.
+ */
+ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Review comment:
note to self: lets see if we can simply the commit() overloaded methods.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -87,44 +88,57 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc,
HoodieIndex index, Hoo
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses) {
- return commit(instantTime, writeStatuses, Option.empty());
+ HoodieTableMetaClient metaClient = createMetaClient(false);
+ String actionType = metaClient.getCommitActionType();
+ return commit(instantTime, writeStatuses, Option.empty(), actionType);
+ }
+
+ /**
+ * Complete changes performed at the given instantTime marker with specified
action.
+ */
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, String commitActionType) {
+ return commit(instantTime, writeStatuses, Option.empty(),
commitActionType);
}
/**
+ *
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
- Option<Map<String, String>> extraMetadata) {
- List<HoodieWriteStat> stats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, stats, extraMetadata);
+ Option<Map<String, String>> extraMetadata) {
+ HoodieTableMetaClient metaClient = createMetaClient(false);
+ String actionType = metaClient.getCommitActionType();
+ return commit(instantTime, writeStatuses, extraMetadata, actionType);
}
- public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata) {
- LOG.info("Committing " + instantTime);
+ /**
+ * Complete changes performed at the given instantTime marker with specified
action.
+ */
+ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
+ Option<Map<String, String>> extraMetadata, String commitActionType) {
+ List<HoodieWriteStat> writeStats = writeStatuses.filter(w ->
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+ List<HoodieWriteStat> replaceStats = writeStatuses.filter(w ->
w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, replaceStats);
+ }
+
+ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
+ String commitActionType, List<HoodieWriteStat>
replaceStats) {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieTableMetaClient metaClient = createMetaClient(false);
- String actionType = metaClient.getCommitActionType();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
- stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(),
stat));
-
+ HoodieCommitMetadata metadata =
CommitUtils.buildWriteActionMetadata(stats, replaceStats, extraMetadata,
operationType, config.getSchema(), commitActionType);
Review comment:
rename: to just buildMetdata() , Commit is already implicit from
context.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata
result) {
}
protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result) {
- commit(extraMetadata, result,
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+ List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w ->
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
Review comment:
note to self: see if there is a way to avoid repeating this filtering
here again
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata
result) {
}
protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result) {
- commit(extraMetadata, result,
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+ List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w ->
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+ List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w
-> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+ commit(extraMetadata, result, writeStats, replacedStats);
}
- protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
- String actionType = table.getMetaClient().getCommitActionType();
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result, List<HoodieWriteStat> writeStats,
List<HoodieWriteStat> replaceStats) {
+ String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
result.setCommitted(true);
- stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(),
stat));
- result.setWriteStats(stats);
+ result.setWriteStats(writeStats);
+ result.setReplaceStats(replaceStats);
// Finalize write
- finalizeWrite(instantTime, stats, result);
-
- // add in extra metadata
- if (extraMetadata.isPresent()) {
- extraMetadata.get().forEach(metadata::addMetadata);
- }
- metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
getSchemaToStoreInCommit());
- metadata.setOperationType(operationType);
+ finalizeWrite(instantTime, writeStats, result);
try {
- activeTimeline.saveAsComplete(new HoodieInstant(true, actionType,
instantTime),
- Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
- LOG.info("Committed " + instantTime);
+ HoodieCommitMetadata metadata = writeInstant(writeStats, replaceStats,
extraMetadata);
+ result.setCommitMetadata(Option.of(metadata));
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
e);
}
- result.setCommitMetadata(Option.of(metadata));
+ }
+
+ private HoodieCommitMetadata writeInstant(List<HoodieWriteStat> writeStats,
List<HoodieWriteStat> replaceStats, Option<Map<String, String>> extraMetadata)
throws IOException {
Review comment:
rename: completeInstant(), its better to stay close to what the method
is doing; using just one terminology
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
Review comment:
rename: deleteReplacedFileGroups() , to be consistent with our
terminology
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
##########
@@ -204,41 +212,44 @@ protected void commitOnAutoCommit(HoodieWriteMetadata
result) {
}
protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result) {
- commit(extraMetadata, result,
result.getWriteStatuses().map(WriteStatus::getStat).collect());
+ List<HoodieWriteStat> writeStats = result.getWriteStatuses().filter(w ->
!w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+ List<HoodieWriteStat> replacedStats = result.getWriteStatuses().filter(w
-> w.isReplacedFileId()).map(WriteStatus::getStat).collect();
+ commit(extraMetadata, result, writeStats, replacedStats);
}
- protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result, List<HoodieWriteStat> stats) {
- String actionType = table.getMetaClient().getCommitActionType();
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata result, List<HoodieWriteStat> writeStats,
List<HoodieWriteStat> replaceStats) {
+ String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
- HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
- HoodieCommitMetadata metadata = new HoodieCommitMetadata();
result.setCommitted(true);
- stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(),
stat));
- result.setWriteStats(stats);
+ result.setWriteStats(writeStats);
+ result.setReplaceStats(replaceStats);
// Finalize write
- finalizeWrite(instantTime, stats, result);
-
- // add in extra metadata
- if (extraMetadata.isPresent()) {
- extraMetadata.get().forEach(metadata::addMetadata);
- }
- metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
getSchemaToStoreInCommit());
- metadata.setOperationType(operationType);
Review comment:
nts: need to ensure the operation type is properly set
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+ .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+ fileGroupsToDelete.forEach(fg -> {
Review comment:
we need to do the deletion in parallel.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+ .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+ fileGroupsToDelete.forEach(fg -> {
+ fg.getAllRawFileSlices().forEach(fileSlice -> {
+ fileSlice.getBaseFile().map(baseFile ->
deletePath(baseFile.getFileStatus().getPath(), instant));
+ fileSlice.getLogFiles().forEach(logFile ->
deletePath(logFile.getPath(), instant));
+ });
+ });
+ }
+
+ /**
+ * Because we are creating new 'HoodieTable' and FileSystemView objects in
this class constructor,
+ * partition view may not be loaded correctly.
+ * Reload all partitions modified by REPLACE action
+ *
+ * TODO find a better way to pass the FileSystemView to this class.
+ */
+ private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant,
TableFileSystemView fileSystemView) {
+ Option<HoodieInstant> replaceInstantOption =
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+ .filter(replaceInstant ->
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+ replaceInstantOption.ifPresent(replaceInstant -> {
Review comment:
This seems like a check for whether the instant is a replacecommit or
not. if the instant time is a completed instant and replacecommit type, then we
must find the instant here, right?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
+ .getReplacedFileGroupsBeforeOrOn(instant.getTimestamp());
+
+ fileGroupsToDelete.forEach(fg -> {
+ fg.getAllRawFileSlices().forEach(fileSlice -> {
+ fileSlice.getBaseFile().map(baseFile ->
deletePath(baseFile.getFileStatus().getPath(), instant));
+ fileSlice.getLogFiles().forEach(logFile ->
deletePath(logFile.getPath(), instant));
+ });
+ });
+ }
+
+ /**
+ * Because we are creating new 'HoodieTable' and FileSystemView objects in
this class constructor,
+ * partition view may not be loaded correctly.
+ * Reload all partitions modified by REPLACE action
+ *
+ * TODO find a better way to pass the FileSystemView to this class.
+ */
+ private void ensureReplacedPartitionsLoadedCorrectly(HoodieInstant instant,
TableFileSystemView fileSystemView) {
+ Option<HoodieInstant> replaceInstantOption =
metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
+ .filter(replaceInstant ->
replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();
+
+ replaceInstantOption.ifPresent(replaceInstant -> {
+ try {
+ HoodieReplaceCommitMetadata metadata =
HoodieReplaceCommitMetadata.fromBytes(
+
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
+ HoodieReplaceCommitMetadata.class);
+
+ metadata.getPartitionToReplaceStats().keySet().forEach(partition ->
fileSystemView.getAllFileGroups(partition));
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to archive because cannot
delete replace files", e);
+ }
+ });
+ }
+
+ private boolean deletePath(Path path, HoodieInstant instant) {
+ try {
+ LOG.info("Deleting " + path + " before archiving " + instant);
+ metaClient.getFs().delete(path);
Review comment:
you probably dont want to fetch the fs object each time? Also lets
delete in parallel, from the get go? we will invariably need to do this, much
like parallelizing cleaning.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
Review comment:
probably a check that this is a replace instant as well?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -576,7 +592,8 @@ public String startCommit() {
rollbackPendingCommits();
}
String instantTime = HoodieActiveTimeline.createNewInstantTime();
- startCommit(instantTime);
+ HoodieTableMetaClient metaClient = createMetaClient(true);
Review comment:
you can just call startCommitWithTime(instantTime) from here?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
Review comment:
Do we need to ask the file system view for all the replace file groups?
this must be in the metadata already right? As long as we can get the
HoodieFileGroup objects corresponding to the filegroup ids in the metadata, we
can go ahead? What I am suggesting in an alternative and subjectively cleaner
replacement for `ensureReplaced...` above, which seems to make a dummy read to
warm up the datastuctures. I prefer to let that happen naturally on its own as
opposed to having this "special" call
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+ this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteActionTimeline();
+ resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
}
/**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
protected List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
HoodieTimer timer = new HoodieTimer().startTimer();
+
Review comment:
nit: remove extra line?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+
+ private static final Logger LOG =
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+ public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime,
WriteOperationType.INSERT_OVERWRITE);
+ this.inputRecordsRDD = inputRecordsRDD;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc,
(HoodieTable<T>) table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+
+ @Override
+ protected Partitioner getPartitioner(WorkloadProfile profile) {
+ return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ }
+
+ @Override
+ protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>>
inputRecordsRDD, WorkloadProfile profile) {
+ // get all existing fileIds to mark them as replaced
+ JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+ // do necessary inserts into new file groups
+ JavaRDD<WriteStatus> writeStatuses =
super.processInputRecords(inputRecordsRDD, profile);
+ return writeStatuses.union(replaceStatuses);
+ }
+
+ private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile
profile) {
+ JavaRDD<String> partitions = jsc.parallelize(new
ArrayList<>(profile.getPartitionPaths()));
+ JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+ getAllExistingFileIds(partition).map(fileId ->
getReplaceWriteStatus(partition, fileId)).iterator());
+
+ return replaceStatuses;
+ }
+
+ private Stream<String> getAllExistingFileIds(String partitionPath) {
+ // because new commit is not complete. it is safe to mark all base files
as old files
+ return
table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile ->
baseFile.getFileId());
+ }
+
+ private WriteStatus getReplaceWriteStatus(String partitionPath, String
fileId) {
+ // mark file as 'replaced' in metadata. the actual file will be removed
later by cleaner to provide snapshot isolation
+ WriteStatus status = new WriteStatus(false, 0.0);
+ status.setReplacedFileId(true);
+ status.setFileId(fileId);
+ status.setTotalErrorRecords(0);
+ status.setPartitionPath(partitionPath);
+ HoodieWriteStat replaceStat = new HoodieWriteStat();
+ status.setStat(replaceStat);
+ replaceStat.setPartitionPath(partitionPath);
+ replaceStat.setFileId(fileId);
+
replaceStat.setPath(table.getBaseFileOnlyView().getLatestBaseFile(partitionPath,
fileId).get().getPath());
+ status.getStat().setNumDeletes(Integer.MAX_VALUE);//token to indicate all
rows are deleted
Review comment:
Seeing such large values in the metadata can be bit confusing. can we
set it to -1 instead for now
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+
+ private static final Logger LOG =
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+ public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime,
WriteOperationType.INSERT_OVERWRITE);
+ this.inputRecordsRDD = inputRecordsRDD;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc,
(HoodieTable<T>) table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+
+ @Override
+ protected Partitioner getPartitioner(WorkloadProfile profile) {
+ return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ }
+
+ @Override
+ protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>>
inputRecordsRDD, WorkloadProfile profile) {
+ // get all existing fileIds to mark them as replaced
+ JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
Review comment:
So, this creates a dependency on the workloadProfile for doing insert
overwrite. While we always provide a WorkloadProfile for now, in the future we
would like to remove this need for caching data in memory and building the
profile.
Can we try to reimplement this such that
- processInputRecords(..) just writes the new records and returns
WriteStatus for the new file groups alone.
- During commit time, after we collect the WriteStatus, we can obtain the
`replaceStatuses` based on the partitions that were actually written to during
step above.
This also gives us a cleaner solution for avoiding the boolean flag we
discussed. API is also consistent now, that writeClient.insertOverwrite() only
returns the WriteStatus for the new file group IDs.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
##########
@@ -94,6 +94,14 @@ public void setWriteStats(List<HoodieWriteStat> writeStats) {
this.writeStats = Option.of(writeStats);
}
+ public Option<List<HoodieWriteStat>> getReplacetats() {
Review comment:
typo: getReplaceStats
##########
File path:
hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
##########
@@ -586,15 +603,23 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
public void startCommitWithTime(String instantTime) {
+ HoodieTableMetaClient metaClient = createMetaClient(true);
+ startCommitWithTime(instantTime, metaClient.getCommitActionType());
+ }
+
+ /**
+ * Completes a new commit time for a write operation (insert/update/delete)
with specified action.
+ */
+ public void startCommitWithTime(String instantTime, String actionType) {
// NOTE : Need to ensure that rollback is done before a new commit is
started
if (rollbackPending) {
// Only rollback inflight commit/delta-commits. Do not touch compaction
commits
rollbackPendingCommits();
}
- startCommit(instantTime);
+ startCommit(instantTime, actionType);
}
- private void startCommit(String instantTime) {
+ private void startCommit(String instantTime, String actionType) {
LOG.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
Review comment:
can we pass in the `metaClient` from caller. This seems to introduce
additional creations, which all list .hoodie again
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+ this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteActionTimeline();
+ resetFileGroupsReplaced(visibleCommitsAndCompactionTimeline);
Review comment:
I think its sufficient todo this reset in the init() method above, much
like pendingCompaction and bootstreap handling. This method is simply used to
refresh the timeline i.e the instants that are visible.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
##########
@@ -46,11 +46,12 @@
public static final String SCHEMA_KEY = "schema";
private static final Logger LOG =
LogManager.getLogger(HoodieCommitMetadata.class);
protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
+
Review comment:
nit: extra line
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/InsertOverwriteCommitActionExecutor.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.WorkloadProfile;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.ArrayList;
+import java.util.stream.Stream;
+
+public class InsertOverwriteCommitActionExecutor<T extends
HoodieRecordPayload<T>>
+ extends CommitActionExecutor<T> {
+
+ private static final Logger LOG =
LogManager.getLogger(InsertOverwriteCommitActionExecutor.class);
+
+ private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+
+ public InsertOverwriteCommitActionExecutor(JavaSparkContext jsc,
+ HoodieWriteConfig config,
HoodieTable table,
+ String instantTime,
JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ super(jsc, config, table, instantTime,
WriteOperationType.INSERT_OVERWRITE);
+ this.inputRecordsRDD = inputRecordsRDD;
+ }
+
+ @Override
+ public HoodieWriteMetadata execute() {
+ return WriteHelper.write(instantTime, inputRecordsRDD, jsc,
(HoodieTable<T>) table,
+ config.shouldCombineBeforeInsert(),
config.getInsertShuffleParallelism(), this, false);
+ }
+
+ @Override
+ protected Partitioner getPartitioner(WorkloadProfile profile) {
+ return new InsertOverwritePartitioner<>(profile, jsc, table, config);
+ }
+
+ @Override
+ protected String getCommitActionType() {
+ return HoodieTimeline.REPLACE_COMMIT_ACTION;
+ }
+
+ @Override
+ protected JavaRDD<WriteStatus> processInputRecords(JavaRDD<HoodieRecord<T>>
inputRecordsRDD, WorkloadProfile profile) {
+ // get all existing fileIds to mark them as replaced
+ JavaRDD<WriteStatus> replaceStatuses = getAllReplaceWriteStatus(profile);
+ // do necessary inserts into new file groups
+ JavaRDD<WriteStatus> writeStatuses =
super.processInputRecords(inputRecordsRDD, profile);
+ return writeStatuses.union(replaceStatuses);
+ }
+
+ private JavaRDD<WriteStatus> getAllReplaceWriteStatus(WorkloadProfile
profile) {
+ JavaRDD<String> partitions = jsc.parallelize(new
ArrayList<>(profile.getPartitionPaths()));
+ JavaRDD<WriteStatus> replaceStatuses = partitions.flatMap(partition ->
+ getAllExistingFileIds(partition).map(fileId ->
getReplaceWriteStatus(partition, fileId)).iterator());
+
+ return replaceStatuses;
+ }
+
+ private Stream<String> getAllExistingFileIds(String partitionPath) {
+ // because new commit is not complete. it is safe to mark all base files
as old files
+ return
table.getBaseFileOnlyView().getAllBaseFiles(partitionPath).map(baseFile ->
baseFile.getFileId());
Review comment:
why limit to just base files. we may have log files without base files.
i.e insert to log files code path
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -110,14 +116,16 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getCommitsAndCompactionTimeline();
+ this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteActionTimeline();
Review comment:
now that replace also is `replacecommit`. should we just leave the
`getCommitsAndCompactionTimeline()` be?
##########
File path: hudi-common/src/main/avro/HoodieReplaceCommitMetadata.avsc
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace":"org.apache.hudi.avro.model",
+ "type":"record",
+ "name":"HoodieReplaceCommitMetadata",
+ "fields":[
+ {
+ "name":"partitionToWriteStats",
Review comment:
for my own understanding, copying the fields from CommitMetadata is the
only way to "inherit" the avro schema, I guess?
also, to be consistent. should we first place the base fields from commit
metadata first, and add `partitionToReplaceStats` at the end?
##########
File path: hudi-client/src/main/java/org/apache/hudi/client/WriteStatus.java
##########
@@ -52,6 +52,9 @@
private HoodieWriteStat stat = null;
+ // if true, indicates the fileId in this WriteStatus is being replaced
+ private boolean isReplacedFileId;
Review comment:
rename: isReplaced
##########
File path:
hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -880,6 +880,89 @@ public void testDeletesWithDeleteApi() throws Exception {
testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140,
keysSoFar);
}
+ /**
+ * Test scenario of writing more file groups than existing number of file
groups in partition.
+ */
+ @Test
+ public void testInsertOverwritePartitionHandlingWithMoreRecords() throws
Exception {
+ verifyInsertOverwritePartitionHandling(1000, 3000);
+ }
+
+ /**
+ * Test scenario of writing fewer file groups than existing number of file
groups in partition.
+ */
+ @Test
+ public void testInsertOverwritePartitionHandlingWithFewerRecords() throws
Exception {
+ verifyInsertOverwritePartitionHandling(3000, 1000);
+ }
+
+ /**
+ * Test scenario of writing similar number file groups in partition.
+ */
+ @Test
+ public void testInsertOverwritePartitionHandlinWithSimilarNumberOfRecords()
throws Exception {
+ verifyInsertOverwritePartitionHandling(3000, 3000);
+ }
+
+ /**
+ * 1) Do write1 (upsert) with 'batch1RecordsCount' number of records.
+ * 2) Do write2 (insert overwrite) with 'batch2RecordsCount' number of
records.
+ *
+ * Verify that all records in step1 are overwritten
+ */
+ private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount,
int batch2RecordsCount) throws Exception {
+ final String testPartitionPath = "americas";
+ HoodieWriteConfig config = getSmallInsertWriteConfig(2000);
+ HoodieWriteClient client = getHoodieWriteClient(config, false);
+ dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+
+ // Do Inserts
+ String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1,
batch1RecordsCount);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 2);
+ List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime1).collect();
+ assertNoWriteErrors(statuses);
+ Set<String> batch1Buckets = statuses.stream().map(s ->
s.getFileId()).collect(Collectors.toSet());
+ verifyParquetFileData(commitTime1, inserts1, statuses);
+
+ // Do Insert Overwrite
+ String commitTime2 = "002";
+ client.startCommitWithTime(commitTime2,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2,
batch2RecordsCount);
+ List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
+ insertsAndUpdates2.addAll(inserts2);
+ JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 2);
+ statuses = client.insertOverwrite(insertAndUpdatesRDD2,
commitTime2).collect();
+ assertNoWriteErrors(statuses);
+ Set<String> replacedBuckets = statuses.stream().filter(s ->
s.isReplacedFileId())
+ .map(s -> s.getFileId()).collect(Collectors.toSet());
+ assertEquals(batch1Buckets, replacedBuckets);
+ List<WriteStatus> newBuckets = statuses.stream().filter(s ->
!(s.isReplacedFileId()))
+ .collect(Collectors.toList());
+ verifyParquetFileData(commitTime2, inserts2, newBuckets);
+ }
+
+ /**
+ * Verify data in parquet files matches expected records and commit time.
+ */
+ private void verifyParquetFileData(String commitTime, List<HoodieRecord>
expectedRecords, List<WriteStatus> allStatus) {
Review comment:
please keep test/naming to base and log files. and not leak parquet to
the test? Also can you please see if this test can be authored by reusing
existing helpers. Its often bit hard to read and reuse the exisiting helpers,
but hte more one-offs we introduce, the worse this situation becomes.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to generate compaction plan from FileGroup/FileSlice
abstraction.
+ */
+public class CommitUtils {
+
+ private static final Logger LOG = LogManager.getLogger(CommitUtils.class);
+
+ public static HoodieCommitMetadata
buildWriteActionMetadata(List<HoodieWriteStat> writeStats,
Review comment:
please add a simple unit tests for this . testing for e.g that the
schema is set, op type is set etc
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -196,6 +205,32 @@ protected void refreshTimeline(HoodieTimeline
visibleActiveTimeline) {
return fileGroups;
}
+ /**
+ * Get replaced instant for each file group by looking at all commit
instants.
+ */
+ private void resetFileGroupsReplaced(HoodieTimeline timeline) {
+ Instant indexStartTime = Instant.now();
Review comment:
please use HoodieTimer to time code segments.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/ActionType.java
##########
@@ -22,5 +22,5 @@
* The supported action types.
*/
public enum ActionType {
- commit, savepoint, compaction, clean, rollback
+ commit, savepoint, compaction, clean, rollback, replacecommit
Review comment:
deltacommit is not here. huh. file a "code cleanup" JIRA for later?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -880,6 +957,30 @@ private FileSlice fetchMergedFileSlice(HoodieFileGroup
fileGroup, FileSlice file
.fromJavaOptional(fetchLatestFileSlices(partitionPath).filter(fs ->
fs.getFileId().equals(fileId)).findFirst());
}
+ private boolean isFileGroupReplaced(HoodieFileGroup fileGroup) {
+ Option<HoodieInstant> hoodieInstantOption =
getReplacedInstant(fileGroup.getFileGroupId());
+ return hoodieInstantOption.isPresent();
+ }
+
+ private boolean isFileGroupReplacedBeforeAny(HoodieFileGroup fileGroup,
List<String> instants) {
+ Option<HoodieInstant> hoodieInstantOption =
getReplacedInstant(fileGroup.getFileGroupId());
+ if (!hoodieInstantOption.isPresent()) {
+ return false;
+ }
+
+ return
HoodieTimeline.compareTimestamps(instants.stream().max(Comparator.naturalOrder()).get(),
Review comment:
can you just call isFileGroupReplacedBeforeOrOn(fileGroup,
max(instants)) ? without having to implement this again
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+ private static final Logger LOG =
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+ protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;
Review comment:
these are the file groups being replaced? I thought we were going to
just track the file ids?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
##########
@@ -727,6 +775,21 @@ private String formatPartitionKey(String partitionStr) {
*/
abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups();
+ /**
+ * Track instant time for file groups replaced.
+ */
+ protected abstract void resetReplacedFileGroups(final Map<HoodieFileGroupId,
HoodieInstant> replacedFileGroups);
+
+ /**
+ * Track instant time for new file groups replaced.
+ */
+ protected abstract void addReplacedFileGroups(final Map<HoodieFileGroupId,
HoodieInstant> replacedFileGroups);
+
+ /**
+ * Track instant time for file groups replaced.
+ */
+ protected abstract Option<HoodieInstant> getReplacedInstant(final
HoodieFileGroupId fileGroupId);
Review comment:
rename: getReplaceInstant()
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -113,6 +112,18 @@ public HoodieDefaultTimeline
getCommitsAndCompactionTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s ->
validActions.contains(s.getAction())), details);
}
+ @Override
+ public HoodieDefaultTimeline getWriteActionTimeline() {
+ Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+ return new HoodieDefaultTimeline(instants.stream().filter(s ->
validActions.contains(s.getAction())), details);
+ }
+
+ @Override
+ public HoodieTimeline getCompletedAndReplaceTimeline() {
Review comment:
rename: getCompletedReplaceTimeline() current naming gives the
impression that its either completed or replacecommit
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
##########
@@ -200,6 +199,14 @@ public static void createInflightCommitFiles(String
basePath, String... instantT
}
}
+ public static HoodieWriteStat createReplaceStat(final String partitionPath,
final String fileId1) {
Review comment:
there is nothing specific about replace in this method? should we move
this to the test class itself. inline?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
##########
@@ -64,6 +64,11 @@
*/
protected Map<HoodieFileGroupId, BootstrapBaseFileMapping>
fgIdToBootstrapBaseFile;
+ /**
+ * Track replace time for replaced file groups.
+ */
+ protected Map<HoodieFileGroupId, HoodieInstant> fgIdToReplaceInstant;
Review comment:
rename: fgIdToReplaceInstants
##########
File path: hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
##########
@@ -36,6 +36,14 @@
],
"default": null
},
+ {
Review comment:
should we just add at the end? not sure if it will be backwards
compatible nicely otherwise.
##########
File path:
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -366,6 +378,14 @@ public static void createCompactionRequestedFile(String
basePath, String instant
createEmptyFile(basePath, commitFile, configuration);
}
+ public static void createDataFile(String basePath, String partitionPath,
String instantTime, String fileID, Configuration configuration)
Review comment:
for tests, I suggest using the `HoodieWritableTestTable` etc instead of
introducing new methods. Also please check other utilities to avoid writing a
new method here
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -376,31 +379,35 @@ private[hudi] object HoodieSparkSqlWriter {
metaSyncSuccess
}
+ /**
+ * Scala says method cannot have more than 7 arguments. So group all
table/action specific information into a case class.
Review comment:
please remove the scala specific comment. its good to encapsulate like
this anyway
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void
removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath,
fileId)).map(Pair::getValue)).findFirst());
}
+ @Override
+ protected void resetReplacedFileGroups(final Map<HoodieFileGroupId,
HoodieInstant> replacedFileGroups) {
Review comment:
@bvaradar if you can take a pass at these, that would be great
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
Review comment:
One more consideration, as I went through the remainder of the PR. if
there was an pending compaction for the replaced file group, then the file
group metadata we encode may miss new base files produced as a result of the
compaction. This scenario needs to be thought thru.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
*/
HoodieTimeline getCommitsAndCompactionTimeline();
+ /**
+ * Timeline to just include commits (commit/deltacommit), replace and
compaction actions.
+ *
+ * @return
+ */
+ HoodieDefaultTimeline getWriteActionTimeline();
Review comment:
see earlier comment on whether we need this method.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -301,6 +304,61 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext
jsc, HoodieInstant in
}
}
+ private void deleteReplacedFiles(HoodieInstant instant) {
+ if (!instant.isCompleted()) {
+ // only delete files for completed instants
+ return;
+ }
+
+ TableFileSystemView fileSystemView = this.table.getFileSystemView();
+ ensureReplacedPartitionsLoadedCorrectly(instant, fileSystemView);
+
+ Stream<HoodieFileGroup> fileGroupsToDelete = fileSystemView
Review comment:
As an after thought, I also realize that if we just encoded the entire
file group being replaced into the metadata, (as opposed to just encoding the
file ids), we can simply delete the file groups without any interaction with
tableFileSytemView at all. Probably a simpler solution even?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
##########
@@ -133,6 +137,21 @@
*/
HoodieTimeline getCommitsAndCompactionTimeline();
+ /**
+ * Timeline to just include commits (commit/deltacommit), replace and
compaction actions.
+ *
+ * @return
+ */
+ HoodieDefaultTimeline getWriteActionTimeline();
Review comment:
To expand, I am wondering if we should just include replacecommit within
`getCommitsAndCompactionTimeline()`. Most of its callers are around
compaction/savepoint/restore etc. So we may not be seeing some cases here.
Things work for now, since filterCompletedInstants() etc are including
replace commit in the timeline when filtering for queries. Semantically, if
replace is a commit level action that can add new data to the timeline, then we
should just treat it like delta commit IMO.
Would anything break if we did do that?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieReplaceCommitMetadata.java
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * All the metadata that gets stored along with a commit.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class HoodieReplaceCommitMetadata extends HoodieCommitMetadata {
+ private static final Logger LOG =
LogManager.getLogger(HoodieReplaceCommitMetadata.class);
+ protected Map<String, List<HoodieWriteStat>> partitionToReplaceStats;
Review comment:
if these are the file groups being replaced, then does this contain all
the file slices (see my comment around deleting the replaced file groups in
timeline archive log)
##########
File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata
insertPrepped(JavaSparkContext jsc, String i
public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc,
String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner>
bulkInsertPartitioner);
+ /**
+ * Logically delete all existing records and Insert a batch of new records
into Hoodie table at the supplied instantTime.
+ */
+ public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc,
String instantTime,
Review comment:
On second thoughts, I am okay leaving this as-is for now as well. and
reeval when acutally implementing clustering
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -111,6 +111,9 @@ private[hudi] object HoodieSparkSqlWriter {
tableConfig = tableMetaClient.getTableConfig
}
+ val metaClient = new
HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get)
+ val commitActionType = DataSourceUtils.getCommitActionType(operation,
metaClient)
Review comment:
should we have a better way of getting the commit action type? I am bit
concerned about creating new metaClient just for this.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
##########
@@ -371,6 +371,47 @@ void
removeBootstrapBaseFileMapping(Stream<BootstrapBaseFileMapping> bootstrapBa
schemaHelper.getPrefixForSliceViewByPartitionFile(partitionPath,
fileId)).map(Pair::getValue)).findFirst());
}
+ @Override
+ protected void resetReplacedFileGroups(final Map<HoodieFileGroupId,
HoodieInstant> replacedFileGroups) {
Review comment:
General question I have around incremental file system view and rocksDB
like persistent file system view storage is whether we will keep this list
updated. i.e when the archival/cleaning runs, how do we ensure the deleted
replaced file groups are no longer tracked inside rocksdb.
I guess the lines below, are doing a bulk delete and insert to achieve the
same?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]