nsivabalan commented on code in PR #9523:
URL: https://github.com/apache/hudi/pull/9523#discussion_r1304953163
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
return dataFiles;
}
+ public Set<String> appendedLogPaths(HoodieEngineContext context, int
parallelism) throws IOException {
Review Comment:
we might need to fix createdAndMergedDataPaths to not return any log file
markers. for eg, we could have CREATE markers for log files. and due to spark
retries, we could have more than 1.
So, essentially we change createdAndMergedDataPaths to return just data
files and not an log files).
here within appendedLogPaths, we can return any marker for log
files(irrespective of whether its CREATE or APPEND)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -276,4 +283,33 @@ protected static Option<IndexedRecord>
toAvroRecord(HoodieRecord record, Schema
return Option.empty();
}
}
+
+ protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
+ // here we distinguish log files created from log files being appended.
Considering following scenario:
+ // An appending task write to log file.
+ // (1) append to existing file file_instant_writetoken1.log.1
+ // (2) rollover and create file file_instant_writetoken2.log.2
+ // Then this task failed and retry by a new task.
+ // (3) append to existing file file_instant_writetoken1.log.1
+ // (4) rollover and create file file_instant_writetoken3.log.2
+ // finally file_instant_writetoken2.log.2 should not be committed to hudi,
we use marker file to delete it.
Review Comment:
minor: this documentation needs to be fixed.
we need to say, we will commit all files to hudi and sync all duplicated
files if any to Metadata table (if enabled) as well.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java:
##########
@@ -60,6 +80,78 @@ public
BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood
super(context, config, table, instantTime, operationType, extraMetadata);
}
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat>
writeStats) {
+ String actionType = getCommitActionType();
+ LOG.info("Committing " + instantTime + ", action Type " + actionType + ",
operation Type " + operationType);
+ result.setCommitted(true);
+ result.setWriteStats(writeStats);
+ // Finalize write
+ finalizeWrite(instantTime, writeStats, result);
+ try {
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata metadata = addMissingLogFileIfNeeded(result);
+ writeTableMetadata(metadata, result.getWriteStatuses(), actionType);
+ activeTimeline.saveAsComplete(new HoodieInstant(true,
getCommitActionType(), instantTime),
+ Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ LOG.info("Committed " + instantTime);
+ result.setCommitMetadata(Option.of(metadata));
+ } catch (IOException e) {
+ throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime,
+ e);
+ }
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata
addMissingLogFileIfNeeded(HoodieWriteMetadata<HoodieData<WriteStatus>> result)
throws IOException {
Review Comment:
we can move these to static utility methods so that we can re-use the code
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -103,35 +120,47 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
- Path baseFilePathForAppend = new Path(basePath, markerFilePath);
- String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
- String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
- String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), baseFilePathForAppend.getParent());
- Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
-
- // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
- // block to the latest log-file
- // TODO(HUDI-1517) use provided marker-file's path instead
- Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
- HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
-
- // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
- if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
- Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
- return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
- Collections.emptyMap());
+ protected HoodieRollbackRequest getRollbackRequestForAppend(String
markerFilePath) throws IOException {
+ Path filePath = new Path(basePath, markerFilePath);
+ String fileId;
+ String baseCommitTime;
+ String relativePartitionPath;
+ Option<HoodieLogFile> latestLogFileOption;
+
+ // Old marker files may be generated from base file name before HUDI-1517.
keep compatible with them.
+ // TODO: deprecated in HUDI-1517, may be removed in the future.
@guanziyue.gzy
+ if (FSUtils.isBaseFile(filePath)) {
Review Comment:
oh, this is already backwards compatible?
i.e during upgrade we don't need to re-create marker files bcoz, we can
handle a mix of marker files.
nice.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.appendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are invalid log files, let's generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "generate
writeStat for missing log files");
+ List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat =
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {
+ FileSystem fileSystem = table.getMetaClient().getFs();
+ FileStatus fileStatus;
+ try {
+ fileStatus = fileSystem.getFileStatus(new
Path(config.getBasePath(), logFilePath));
+ } catch (FileNotFoundException fileNotFoundException) {
+ return Option.empty();
+ }
+
+ HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
+ HoodieLogFile logFile = new HoodieLogFile(fileStatus);
+ writeStat.setPath(logFilePath);
+ writeStat.setFileId(logFile.getFileId());
+ writeStat.setTotalWriteBytes(logFile.getFileSize());
+ writeStat.setFileSizeInBytes(logFile.getFileSize());
+ writeStat.setLogVersion(logFile.getLogVersion());
+
writeStat.setLogFiles(Collections.singletonList(logFile.getPath().toString()));
+ writeStat.setPartitionPath(FSUtils.getRelativePartitionPath(new
Path(config.getBasePath()), fileStatus.getPath().getParent()));
+ return Option.of(writeStat);
+ }, config.getFinalizeWriteParallelism());
+
+ // add these write stat to commit meta
Review Comment:
lets add documentation(comments) around why would a file could be missing
here. I mean, a marker refers to a file, but the actual file is missing.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles()
throws Exception {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD,
newCommitTime);
+ long expectedLogFileNum = statuses.map(writeStatus ->
(HoodieDeltaWriteStat) writeStatus.getStat())
+ .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+ .count();
+ // inject a fake log file to test marker file for log file
+ HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat)
statuses.map(WriteStatus::getStat).take(1).get(0);
+ assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+ HoodieLogFile correctLogFile = new
HoodieLogFile(correctWriteStat.getPath());
+ String correctWriteToken =
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+ final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+ final WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(),
+ HoodieSparkTable.create(config, context()), newCommitTime);
+ HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+ .onParentPath(FSUtils.getPartitionPath(config.getBasePath(),
correctWriteStat.getPartitionPath()))
+
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+ .withLogVersion(correctLogFile.getLogVersion())
+ .withFileSize(0L)
+ .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+ .withRolloverLogWriteToken(fakeToken)
+ .withLogWriteToken(fakeToken)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ return writeMarkers.create(correctWriteStat.getPartitionPath(),
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return writeMarkers.create(correctWriteStat.getPartitionPath(),
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+ }
+ }).build();
+ AppendResult fakeAppendResult =
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));
Review Comment:
lets not call it fake.
lets name these additionaLogWriter, additionalAppendResult etc
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
return dataFiles;
}
+ public Set<String> appendedLogPaths(HoodieEngineContext context, int
parallelism) throws IOException {
+ Set<String> logFiles = new HashSet<>();
+
+ FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
+ List<String> subDirectories = new ArrayList<>();
+ for (FileStatus topLevelStatus: topLevelStatuses) {
+ if (topLevelStatus.isFile()) {
+ String pathStr = topLevelStatus.getPath().toString();
+ if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) &&
pathStr.endsWith(IOType.APPEND.name())) {
+ logFiles.add(translateMarkerToDataPath(pathStr));
+ }
+ } else {
+ subDirectories.add(topLevelStatus.getPath().toString());
+ }
+ }
+
+ if (subDirectories.size() > 0) {
Review Comment:
there is an opportunity for code re-use here.
I understand you want to quickly put out the patch. but before we land, lets
retry to code re-use
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -78,22 +82,35 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
return context.map(markerPaths, markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String partitionFilePath =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePath = new Path(basePath, partitionFilePath);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePath)) {
Review Comment:
one thing to remember is. we need to create markers for log files during
upgrade.
i.e. just before upgrade, is there was a failed commit, the log files
produced by them are not in the new format. So, during upgrade we might need to
track them down and create markers in this new format before rollback kicks in.
not required in this patch itself, but in a follow up patch.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java:
##########
@@ -78,22 +82,35 @@ public List<HoodieRollbackRequest>
getRollbackRequests(HoodieInstant instantToRo
return context.map(markerPaths, markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
+ String partitionFilePath =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullFilePath = new Path(basePath, partitionFilePath);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullDeletePath = new Path(basePath, fileToDelete);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
- return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
- Collections.singletonList(fullDeletePath.toString()),
+ String fileId = null;
+ String baseInstantTime = null;
+ if (FSUtils.isBaseFile(fullFilePath)) {
+ HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePath.toString());
+ fileId = baseFileToDelete.getFileId();
+ baseInstantTime = baseFileToDelete.getCommitTime();
+ } else if (FSUtils.isLogFile(fullFilePath)) {
+ // TODO: HUDI-1517 may distinguish log file created from log
file being appended in the future @guanziyue
+ // Now it should not have create type
+ checkArgument(type != IOType.CREATE, "Log file should not
support create io type now");
+ checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
Review Comment:
oh I see, we are creating APPENDS for all log files irrespective of whether
it was created or appended to.
makese sense to not complicate things.
I also, initially thought about LOG_CREATE and LOG_APPEND.
but the purpose of these markers are during re-concilication and for log
files, we are not going to delete and treat everything as appends or new files
created. so not a bad idea to go with APPEND.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.appendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are invalid log files, let's generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "generate
writeStat for missing log files");
+ List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat =
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {
Review Comment:
lets rename this.
may be, additionalLogFileWriteStat
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -104,6 +124,84 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses),
writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds,
extraPreCommitFunc);
}
+ protected void commit(HoodieTable table, String commitActionType, String
instantTime, HoodieCommitMetadata metadata,
+ List<HoodieWriteStat> stats, HoodieData<WriteStatus>
writeStatuses) throws IOException {
+ LOG.info("Committing " + instantTime + " action " + commitActionType);
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ HoodieCommitMetadata fixedCommitMetadata =
addMissingLogFileIfNeeded(table, commitActionType, instantTime, metadata);
+ // Finalize write
+ finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if
(!fixedCommitMetadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && fixedCommitMetadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, fixedCommitMetadata);
+ }
+ // update Metadata table
+ writeTableMetadata(table, instantTime, fixedCommitMetadata, writeStatuses);
+ activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
+
Option.of(fixedCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ }
+
+ /* In spark mor table, any failed spark task may generate log files which
are not included in write status.
+ * We need to add these to CommitMetadata so that it will be synced to MDT
and make MDT has correct file info.
+ */
+ private HoodieCommitMetadata addMissingLogFileIfNeeded(HoodieTable table,
String commitActionType, String instantTime,
+ HoodieCommitMetadata
commitMetadata) throws IOException {
+ if
(!table.getMetaClient().getTableConfig().getTableType().equals(HoodieTableType.MERGE_ON_READ)
+ || !commitActionType.equals(HoodieActiveTimeline.DELTA_COMMIT_ACTION))
{
+ return commitMetadata;
+ }
+
+ HoodieCommitMetadata metadata = commitMetadata;
+ WriteMarkers markers = WriteMarkersFactory.get(config.getMarkersType(),
table, instantTime);
+ // if there is log files in this delta commit, we search any invalid log
files generated by failed spark task
+ boolean hasLogFileInDeltaCommit = metadata.getPartitionToWriteStats()
+ .values().stream().flatMap(List::stream)
+ .anyMatch(writeStat -> FSUtils.isLogFile(new
Path(config.getBasePath(), writeStat.getPath()).getName()));
+ if (hasLogFileInDeltaCommit) {
+ // get all log files generated by log mark file
+ Set<String> logFilesMarkerPath = new
HashSet<>(markers.appendedLogPaths(context,
config.getFinalizeWriteParallelism()));
+
+ // remove valid log files
+ for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats :
metadata.getPartitionToWriteStats().entrySet()) {
+ for (HoodieWriteStat hoodieWriteStat :
partitionAndWriteStats.getValue()) {
+ logFilesMarkerPath.remove(hoodieWriteStat.getPath());
+ }
+ }
+
+ // remaining are invalid log files, let's generate write stat for them
+ if (logFilesMarkerPath.size() > 0) {
+ context.setJobStatus(this.getClass().getSimpleName(), "generate
writeStat for missing log files");
+ List<Option<HoodieDeltaWriteStat>> fakeLogFileWriteStat =
context.map(new ArrayList<>(logFilesMarkerPath), (logFilePath) -> {
+ FileSystem fileSystem = table.getMetaClient().getFs();
+ FileStatus fileStatus;
+ try {
+ fileStatus = fileSystem.getFileStatus(new
Path(config.getBasePath(), logFilePath));
Review Comment:
lets move this file status/file size deduction to a private method.
may be we want to optimize by doing one listing per partition. if for hdfs,
we can add diff logic. But for cloud stores, this sill result in too many
remote calls.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java:
##########
@@ -60,6 +80,78 @@ public
BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, Hood
super(context, config, table, instantTime, operationType, extraMetadata);
}
+ protected void commit(Option<Map<String, String>> extraMetadata,
HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat>
writeStats) {
Review Comment:
I see why we are overriding entire commit method here.
can we introduce a method called `appendMetadataForMissingFiles()`.
and the default in BaseSparkCommitActionExecutor will be no-op (will return
the original commit metadata as is).
here in BaseSparkDeltaCommitActionExecutor, instead of over-riding entire
commit method, we can override just appendMetadataForMissingFiles
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -130,6 +132,53 @@ public Set<String>
createdAndMergedDataPaths(HoodieEngineContext context, int pa
return dataFiles;
}
+ public Set<String> appendedLogPaths(HoodieEngineContext context, int
parallelism) throws IOException {
Review Comment:
may be we can name this
createdAndAppendedLogFilePaths
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -844,6 +847,48 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>>
twoUpsertCommitDataWithTwoP
return Pair.of(records, records2);
}
+ /**
+ * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
+ *
+ * @param table instance of {@link HoodieTable}
+ */
+ private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
+ List<HoodieInstant> instantsToBeParsed =
+ metaClient.getActiveTimeline()
+ .getCommitsTimeline()
+ .getInstantsAsStream()
+ .collect(Collectors.toList());
+ for (HoodieInstant instant : instantsToBeParsed) {
+ WriteMarkers writeMarkers =
+ WriteMarkersFactory.get(table.getConfig().getMarkersType(), table,
instant.getTimestamp());
+ Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
+ boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker ->
marker.contains(IOType.APPEND.name())
Review Comment:
why both APPEND and CREATE.
I guess APPEND alone should suffice right?
we need to re-generate markers only for log files right.
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java:
##########
@@ -343,27 +359,73 @@ public void testSimpleInsertsGeneratedIntoLogFiles()
throws Exception {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD,
newCommitTime);
+ long expectedLogFileNum = statuses.map(writeStatus ->
(HoodieDeltaWriteStat) writeStatus.getStat())
+ .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
+ .count();
+ // inject a fake log file to test marker file for log file
+ HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat)
statuses.map(WriteStatus::getStat).take(1).get(0);
+ assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
+ HoodieLogFile correctLogFile = new
HoodieLogFile(correctWriteStat.getPath());
+ String correctWriteToken =
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
+
+ final String fakeToken = generateFakeWriteToken(correctWriteToken);
+
+ final WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(),
+ HoodieSparkTable.create(config, context()), newCommitTime);
+ HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
+ .onParentPath(FSUtils.getPartitionPath(config.getBasePath(),
correctWriteStat.getPartitionPath()))
+
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
+ .withLogVersion(correctLogFile.getLogVersion())
+ .withFileSize(0L)
+ .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
+ .withRolloverLogWriteToken(fakeToken)
+ .withLogWriteToken(fakeToken)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogWriteCallback(new HoodieLogFileWriteCallback() {
+ @Override
+ public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
+ return writeMarkers.create(correctWriteStat.getPartitionPath(),
logFileToAppend.getFileName(), IOType.APPEND).isPresent();
+ }
+
+ @Override
+ public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
+ return writeMarkers.create(correctWriteStat.getPartitionPath(),
logFileToCreate.getFileName(), IOType.APPEND).isPresent();
+ }
+ }).build();
+ AppendResult fakeAppendResult =
fakeLogWriter.appendBlock(getLogBlock(records, config.getSchema()));
+ fakeLogWriter.close();
+ // check marker for fake log generated
+ assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker ->
marker.contains(fakeToken)));
+ SyncableFileSystemView unCommittedFsView =
getFileSystemViewWithUnCommittedSlices(metaClient);
+ // check fake log generated
+
assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath())
+ .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath)
+ .anyMatch(path ->
path.getName().equals(fakeAppendResult.logFile().getPath().getName())));
writeClient.commit(newCommitTime, statuses);
HoodieTable table = HoodieSparkTable.create(config, context(),
metaClient);
table.getHoodieView().sync();
TableFileSystemView.SliceView tableRTFileSystemView =
table.getSliceView();
-
+ // get log file number from filesystem view
long numLogFiles = 0;
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> allSlices =
tableRTFileSystemView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
assertEquals(0, allSlices.stream().filter(fileSlice ->
fileSlice.getBaseFile().isPresent()).count());
assertTrue(allSlices.stream().anyMatch(fileSlice ->
fileSlice.getLogFiles().count() > 0));
- long logFileCount = allSlices.stream().filter(fileSlice ->
fileSlice.getLogFiles().count() > 0).count();
+ long logFileCount = allSlices.stream().mapToLong(fileSlice ->
fileSlice.getLogFiles().count()).sum();
if (logFileCount > 0) {
// check the log versions start from the base version
assertTrue(allSlices.stream().map(slice ->
slice.getLogFiles().findFirst().get().getLogVersion())
.allMatch(version ->
version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
}
numLogFiles += logFileCount;
}
-
- assertTrue(numLogFiles > 0);
+ // check log file number in file system cover both valid log file and
invalid log file
Review Comment:
minor.
"check log file number in file system to cover all log files including
additional log files created with spark task retries".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]