This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d9801a9ea2 Revert "[HUDI-1517] create marker file for every log file
(#4913)" (#9251)
3d9801a9ea2 is described below
commit 3d9801a9ea2ca663b8540c21c1adbb4fb64cb95f
Author: Danny Chan <[email protected]>
AuthorDate: Fri Jul 21 20:01:06 2023 +0800
Revert "[HUDI-1517] create marker file for every log file (#4913)" (#9251)
This reverts commit df64d647
---
.../hudi/cli/integ/ITTestMarkersCommand.java | 4 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 7 ++
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 33 --------
.../java/org/apache/hudi/table/HoodieTable.java | 24 ++----
.../rollback/MarkerBasedRollbackStrategy.java | 96 ++++++++--------------
.../hudi/table/marker/DirectWriteMarkers.java | 4 +-
.../marker/TimelineServerBasedWriteMarkers.java | 8 +-
.../org/apache/hudi/table/marker/WriteMarkers.java | 46 +++++------
.../providers/HoodieMetaClientProvider.java | 14 ----
.../java/org/apache/hudi/io/FlinkAppendHandle.java | 27 +++---
.../TestMergeOnReadRollbackActionExecutor.java | 23 ++----
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 82 +-----------------
.../TestHoodieSparkMergeOnReadTableRollback.java | 13 +++
.../TestMarkerBasedRollbackStrategy.java | 59 ++++++-------
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 45 ----------
.../table/log/HoodieLogFileWriteCallback.java | 42 ----------
.../hudi/common/table/log/HoodieLogFormat.java | 15 +---
.../common/table/log/HoodieLogFormatWriter.java | 22 ++---
.../hudi/common/testutils/FileCreateUtils.java | 52 ++----------
.../hudi/common/testutils/HoodieTestTable.java | 10 +--
.../hudi/sink/bucket/ITTestBucketStreamWrite.java | 8 +-
.../sql/hudi/procedure/TestCallProcedure.scala | 2 +-
22 files changed, 171 insertions(+), 465 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
index 934164757ae..5aacfd82de0 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
@@ -69,8 +69,8 @@ public class ITTestMarkersCommand extends
HoodieCLIIntegrationTestBase {
// generate markers
String instantTime1 = "101";
- FileCreateUtils.createLogFileMarker(tablePath, "partA", instantTime1,
"f0", IOType.APPEND);
- FileCreateUtils.createLogFileMarker(tablePath, "partA", instantTime1,
"f1", IOType.APPEND);
+ FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f0",
IOType.APPEND);
+ FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f1",
IOType.APPEND);
assertEquals(2, FileCreateUtils.getTotalMarkerFileCount(tablePath,
"partA", instantTime1, IOType.APPEND));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 1c84d49af29..d0819aa8007 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -201,6 +201,13 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
new Path(config.getBasePath()),
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
hoodieTable.getPartitionMetafileFormat());
partitionMetadata.trySave(getPartitionId());
+
+ // Since the actual log file written to can be different based on when
rollover happens, we use the
+ // base file to denote some log appends happened on a slice.
writeToken will still fence concurrent
+ // writers.
+ // https://issues.apache.org/jira/browse/HUDI-1517
+ createMarkerFile(partitionPath,
FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId,
hoodieTable.getBaseFileExtension()));
+
this.writer = createLogWriter(fileSlice, baseInstantTime);
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index a63050ded5f..9e716a280e8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -40,7 +39,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.marker.WriteMarkers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.avro.Schema;
@@ -257,14 +255,9 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x ->
FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withSuffix(suffix)
- .withLogWriteCallback(getLogWriteCallback())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
- protected HoodieLogFileWriteCallback getLogWriteCallback() {
- return new AppendLogWriteCallback();
- }
-
protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime,
String fileSuffix) {
try {
return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
@@ -283,30 +276,4 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
return Option.empty();
}
}
-
- protected class AppendLogWriteCallback implements HoodieLogFileWriteCallback
{
- // here we distinguish log files created from log files being appended.
Considering following scenario:
- // An appending task write to log file.
- // (1) append to existing file file_instant_writetoken1.log.1
- // (2) rollover and create file file_instant_writetoken2.log.2
- // Then this task failed and retry by a new task.
- // (3) append to existing file file_instant_writetoken1.log.1
- // (4) rollover and create file file_instant_writetoken3.log.2
- // finally file_instant_writetoken2.log.2 should not be committed to hudi,
we use marker file to delete it.
- // keep in mind that log file is not always fail-safe unless it never roll
over
-
- @Override
- public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
- WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
- return writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND,
- config, fileId,
hoodieTable.getMetaClient().getActiveTimeline()).isPresent();
- }
-
- @Override
- public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
- WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
- return writeMarkers.create(partitionPath, logFileToCreate.getFileName(),
IOType.CREATE,
- config, fileId,
hoodieTable.getMetaClient().getActiveTimeline()).isPresent();
- }
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 42a12bbcd9e..71295098f03 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -97,7 +97,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@@ -728,24 +727,19 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
return;
}
- // we are not including log appends for update here, since they are
already fail-safe.
- // but log files created is included
- Set<String> invalidFilePaths = getInvalidDataPaths(markers);
- Set<String> validFilePaths = stats.stream()
+ // we are not including log appends here, since they are already
fail-safe.
+ Set<String> invalidDataPaths = getInvalidDataPaths(markers);
+ Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
- .collect(Collectors.toSet());
- Set<String> validCdcFilePaths = stats.stream()
- .map(HoodieWriteStat::getCdcStats)
- .filter(Objects::nonNull)
- .flatMap(cdcStat -> cdcStat.keySet().stream())
+ .filter(p -> p.endsWith(this.getBaseFileExtension()))
.collect(Collectors.toSet());
// Contains list of partially created files. These needs to be cleaned
up.
- invalidFilePaths.removeAll(validFilePaths);
- invalidFilePaths.removeAll(validCdcFilePaths);
- if (!invalidFilePaths.isEmpty()) {
- LOG.info("Removing duplicate files created due to task retries before
committing. Paths=" + invalidFilePaths);
- Map<String, List<Pair<String, String>>> invalidPathsByPartition =
invalidFilePaths.stream()
+ invalidDataPaths.removeAll(validDataPaths);
+
+ if (!invalidDataPaths.isEmpty()) {
+ LOG.info("Removing duplicate data files created due to task retries
before committing. Paths=" + invalidDataPaths);
+ Map<String, List<Pair<String, String>>> invalidPathsByPartition =
invalidDataPaths.stream()
.map(dp -> Pair.of(new Path(basePath, dp).getParent().toString(),
new Path(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
index 099f1bdcbfc..791191c0ef3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java
@@ -18,24 +18,21 @@
package org.apache.hudi.table.action.rollback;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.MarkerBasedRollbackUtils;
import org.apache.hudi.table.marker.WriteMarkers;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +41,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static
org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING;
/**
* Performs rollback using marker files generated during the write..
@@ -82,32 +78,22 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
return context.map(markerPaths, markerFilePath -> {
String typeStr =
markerFilePath.substring(markerFilePath.lastIndexOf(".") + 1);
IOType type = IOType.valueOf(typeStr);
- String partitionFilePath =
WriteMarkers.stripMarkerSuffix(markerFilePath);
- Path fullFilePath = new Path(basePath, partitionFilePath);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
switch (type) {
case MERGE:
case CREATE:
- String fileId = null;
- String baseInstantTime = null;
- if (FSUtils.isBaseFile(fullFilePath)) {
- HoodieBaseFile baseFileToDelete = new
HoodieBaseFile(fullFilePath.toString());
- fileId = baseFileToDelete.getFileId();
- baseInstantTime = baseFileToDelete.getCommitTime();
- } else if (FSUtils.isLogFile(fullFilePath)) {
- checkArgument(type != IOType.MERGE, "Log file should not support
merge io type");
- HoodieLogFile logFileToDelete = new
HoodieLogFile(fullFilePath.toString());
- fileId = logFileToDelete.getFileId();
- baseInstantTime = logFileToDelete.getBaseCommitTime();
- }
- Objects.requireNonNull(fileId, "Cannot find valid fileId from
path: " + fullFilePath);
- Objects.requireNonNull(baseInstantTime, "Cannot find valid base
instant from path: " + fullFilePath);
- return new HoodieRollbackRequest(partitionPath, fileId,
baseInstantTime,
- Collections.singletonList(fullFilePath.toString()),
+ String fileToDelete =
WriteMarkers.stripMarkerSuffix(markerFilePath);
+ Path fullDeletePath = new Path(basePath, fileToDelete);
+ String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullDeletePath.getParent());
+ return new HoodieRollbackRequest(partitionPath, EMPTY_STRING,
EMPTY_STRING,
+ Collections.singletonList(fullDeletePath.toString()),
Collections.emptyMap());
case APPEND:
- HoodieRollbackRequest rollbackRequestForAppend =
getRollbackRequestForAppend(partitionFilePath);
- return rollbackRequestForAppend;
+ // NOTE: This marker file-path does NOT correspond to a log-file,
but rather is a phony
+ // path serving as a "container" for the following
components:
+ // - Base file's file-id
+ // - Base file's commit instant
+ // - Partition path
+ return getRollbackRequestForAppend(instantToRollback,
WriteMarkers.stripMarkerSuffix(markerFilePath));
default:
throw new HoodieRollbackException("Unknown marker type, during
rollback of " + instantToRollback);
}
@@ -117,47 +103,35 @@ public class MarkerBasedRollbackStrategy<T, I, K, O>
implements BaseRollbackPlan
}
}
- protected HoodieRollbackRequest getRollbackRequestForAppend(String
markerFilePath) throws IOException {
- Path filePath = new Path(basePath, markerFilePath);
- String fileId;
- String baseCommitTime;
- String relativePartitionPath;
- Option<HoodieLogFile> latestLogFileOption;
-
- // Old marker files may be generated from base file name before HUDI-1517.
keep compatible with them.
- // TODO: may deprecated in the future. @guanziyue.gzy
- if (FSUtils.isBaseFile(filePath)) {
- LOG.warn("Find old marker type for log file: " + markerFilePath);
- fileId = FSUtils.getFileIdFromFilePath(filePath);
- baseCommitTime = FSUtils.getCommitTime(filePath.getName());
- relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), filePath.getParent());
- Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
-
- // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
- // block to the latest log-file
- try {
- latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
- HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
- } catch (IOException ioException) {
- throw new HoodieIOException(
- "Failed to get latestLogFile for fileId: " + fileId + " in
partition: " + partitionPath,
- ioException);
- }
- } else {
- HoodieLogFile latestLogFile = new HoodieLogFile(filePath);
- fileId = latestLogFile.getFileId();
- baseCommitTime = latestLogFile.getBaseCommitTime();
- relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), filePath.getParent());
- latestLogFileOption = Option.of(latestLogFile);
+ protected HoodieRollbackRequest getRollbackRequestForAppend(HoodieInstant
instantToRollback, String markerFilePath) throws IOException {
+ Path baseFilePathForAppend = new Path(basePath, markerFilePath);
+ String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend);
+ String baseCommitTime =
FSUtils.getCommitTime(baseFilePathForAppend.getName());
+ String relativePartitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), baseFilePathForAppend.getParent());
+ Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(),
relativePartitionPath);
+
+ // NOTE: Since we're rolling back incomplete Delta Commit, it only could
have appended its
+ // block to the latest log-file
+ // TODO(HUDI-1517) use provided marker-file's path instead
+ Option<HoodieLogFile> latestLogFileOption =
FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId,
+ HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime);
+
+ // Log file can be deleted if the commit to rollback is also the commit
that created the fileGroup
+ if (latestLogFileOption.isPresent() &&
baseCommitTime.equals(instantToRollback.getTimestamp())) {
+ Path fullDeletePath = new Path(partitionPath,
latestLogFileOption.get().getFileName());
+ return new HoodieRollbackRequest(relativePartitionPath, EMPTY_STRING,
EMPTY_STRING,
+ Collections.singletonList(fullDeletePath.toString()),
+ Collections.emptyMap());
}
-
+
Map<String, Long> logFilesWithBlocsToRollback = new HashMap<>();
if (latestLogFileOption.isPresent()) {
HoodieLogFile latestLogFile = latestLogFileOption.get();
// NOTE: Marker's don't carry information about the cumulative size of
the blocks that have been appended,
// therefore we simply stub this value.
- logFilesWithBlocsToRollback =
Collections.singletonMap(latestLogFile.getPath().toString(), -1L);
+ logFilesWithBlocsToRollback =
Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(),
-1L);
}
+
return new HoodieRollbackRequest(relativePartitionPath, fileId,
baseCommitTime, Collections.emptyList(),
logFilesWithBlocsToRollback);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index bc2a00df7b2..f9c30ca1736 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -158,8 +158,8 @@ public class DirectWriteMarkers extends WriteMarkers {
}
@Override
- protected Option<Path> create(String partitionPath, String fileName, IOType
type, boolean checkIfExists) {
- return create(getMarkerPath(partitionPath, fileName, type), checkIfExists);
+ protected Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists) {
+ return create(getMarkerPath(partitionPath, dataFileName, type),
checkIfExists);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
index b168089cd01..9d6b7f9b9a9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java
@@ -133,9 +133,9 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
}
@Override
- protected Option<Path> create(String partitionPath, String fileName, IOType
type, boolean checkIfExists) {
+ protected Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists) {
HoodieTimer timer = HoodieTimer.start();
- String markerFileName = getMarkerFileName(fileName, type);
+ String markerFileName = getMarkerFileName(dataFileName, type);
Map<String, String> paramsMap = getConfigMap(partitionPath,
markerFileName, false);
boolean success = executeCreateMarkerRequest(paramsMap, partitionPath,
markerFileName);
@@ -149,10 +149,10 @@ public class TimelineServerBasedWriteMarkers extends
WriteMarkers {
}
@Override
- public Option<Path> createWithEarlyConflictDetection(String partitionPath,
String fileName, IOType type, boolean checkIfExists,
+ public Option<Path> createWithEarlyConflictDetection(String partitionPath,
String dataFileName, IOType type, boolean checkIfExists,
HoodieWriteConfig
config, String fileId, HoodieActiveTimeline activeTimeline) {
HoodieTimer timer = new HoodieTimer().startTimer();
- String markerFileName = getMarkerFileName(fileName, type);
+ String markerFileName = getMarkerFileName(dataFileName, type);
Map<String, String> paramsMap = getConfigMap(partitionPath,
markerFileName, true);
boolean success = executeCreateMarkerRequest(paramsMap, partitionPath,
markerFileName);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
index 8a9c5c4ad1a..93aba9c0f89 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java
@@ -59,12 +59,12 @@ public abstract class WriteMarkers implements Serializable {
* Creates a marker without checking if the marker already exists.
*
* @param partitionPath partition path in the table.
- * @param fileName file name.
+ * @param dataFileName data file name.
* @param type write IO type.
* @return the marker path.
*/
- public Option<Path> create(String partitionPath, String fileName, IOType
type) {
- return create(partitionPath, fileName, type, false);
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type) {
+ return create(partitionPath, dataFileName, type, false);
}
/**
@@ -72,14 +72,14 @@ public abstract class WriteMarkers implements Serializable {
* This can invoke marker-based early conflict detection when enabled for
multi-writers.
*
* @param partitionPath partition path in the table
- * @param fileName file name
+ * @param dataFileName data file name
* @param type write IO type
* @param writeConfig Hudi write configs.
* @param fileId File ID.
* @param activeTimeline Active timeline for the write operation.
* @return the marker path.
*/
- public Option<Path> create(String partitionPath, String fileName, IOType
type, HoodieWriteConfig writeConfig,
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type, HoodieWriteConfig writeConfig,
String fileId, HoodieActiveTimeline
activeTimeline) {
if
(writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
&& writeConfig.isEarlyConflictDetectionEnable()) {
@@ -88,23 +88,23 @@ public abstract class WriteMarkers implements Serializable {
// TODO If current is compact or clustering then create marker directly
without early conflict detection.
// Need to support early conflict detection between table service and
common writers.
if (pendingCompactionTimeline.containsInstant(instantTime) ||
pendingReplaceTimeline.containsInstant(instantTime)) {
- return create(partitionPath, fileName, type, false);
+ return create(partitionPath, dataFileName, type, false);
}
- return createWithEarlyConflictDetection(partitionPath, fileName, type,
false, writeConfig, fileId, activeTimeline);
+ return createWithEarlyConflictDetection(partitionPath, dataFileName,
type, false, writeConfig, fileId, activeTimeline);
}
- return create(partitionPath, fileName, type, false);
+ return create(partitionPath, dataFileName, type, false);
}
/**
* Creates a marker if the marker does not exist.
*
* @param partitionPath partition path in the table
- * @param fileName file name
+ * @param dataFileName data file name
* @param type write IO type
* @return the marker path or empty option if already exists
*/
- public Option<Path> createIfNotExists(String partitionPath, String fileName,
IOType type) {
- return create(partitionPath, fileName, type, true);
+ public Option<Path> createIfNotExists(String partitionPath, String
dataFileName, IOType type) {
+ return create(partitionPath, dataFileName, type, true);
}
/**
@@ -161,27 +161,27 @@ public abstract class WriteMarkers implements
Serializable {
}
/**
- * Gets the marker file name, in the format of
"[file_name].marker.[IO_type]".
+ * Gets the marker file name, in the format of
"[data_file_name].marker.[IO_type]".
*
- * @param fileName file name
+ * @param dataFileName data file name
* @param type IO type
* @return the marker file name
*/
- protected static String getMarkerFileName(String fileName, IOType type) {
- return String.format("%s%s.%s", fileName,
HoodieTableMetaClient.MARKER_EXTN, type.name());
+ protected String getMarkerFileName(String dataFileName, IOType type) {
+ return String.format("%s%s.%s", dataFileName,
HoodieTableMetaClient.MARKER_EXTN, type.name());
}
/**
* Returns the marker path. Would create the partition path first if not
exists
*
* @param partitionPath The partition path
- * @param fileName The file name
+ * @param dataFileName The data file name
* @param type The IO type
* @return path of the marker file
*/
- protected Path getMarkerPath(String partitionPath, String fileName, IOType
type) {
+ protected Path getMarkerPath(String partitionPath, String dataFileName,
IOType type) {
Path path = FSUtils.getPartitionPath(markerDirPath, partitionPath);
- String markerFileName = getMarkerFileName(fileName, type);
+ String markerFileName = getMarkerFileName(dataFileName, type);
return new Path(path, markerFileName);
}
@@ -203,7 +203,7 @@ public abstract class WriteMarkers implements Serializable {
/**
* @param context {@code HoodieEngineContext} instance.
* @param parallelism parallelism for reading the marker files in the
directory.
- * @return all the data file or log file paths of write IO type "CREATE" and
"MERGE"
+ * @return all the data file paths of write IO type "CREATE" and "MERGE"
* @throws IOException
*/
public abstract Set<String> createdAndMergedDataPaths(HoodieEngineContext
context, int parallelism) throws IOException;
@@ -218,19 +218,19 @@ public abstract class WriteMarkers implements
Serializable {
* Creates a marker.
*
* @param partitionPath partition path in the table
- * @param fileName file name
+ * @param dataFileName data file name
* @param type write IO type
* @param checkIfExists whether to check if the marker already exists
* @return the marker path or empty option if already exists and {@code
checkIfExists} is true
*/
- abstract Option<Path> create(String partitionPath, String fileName, IOType
type, boolean checkIfExists);
+ abstract Option<Path> create(String partitionPath, String dataFileName,
IOType type, boolean checkIfExists);
/**
* Creates a marker with early conflict detection for multi-writers. If
conflict is detected,
* an exception is thrown to fail the write operation.
*
* @param partitionPath partition path in the table.
- * @param fileName file name.
+ * @param dataFileName data file name.
* @param type write IO type.
* @param checkIfExists whether to check if the marker already exists.
* @param config Hudi write configs.
@@ -238,6 +238,6 @@ public abstract class WriteMarkers implements Serializable {
* @param activeTimeline Active timeline for the write operation.
* @return the marker path or empty option if already exists and {@code
checkIfExists} is true.
*/
- public abstract Option<Path> createWithEarlyConflictDetection(String
partitionPath, String fileName, IOType type, boolean checkIfExists,
+ public abstract Option<Path> createWithEarlyConflictDetection(String
partitionPath, String dataFileName, IOType type, boolean checkIfExists,
HoodieWriteConfig config, String fileId, HoodieActiveTimeline activeTimeline);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 721cc5e7c5b..0cd7ed5a715 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -22,9 +22,6 @@ package org.apache.hudi.testutils.providers;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
-import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -40,15 +37,4 @@ public interface HoodieMetaClientProvider {
HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses) {
return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline,
fileStatuses);
}
-
- default SyncableFileSystemView
getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
- try {
- return new HoodieTableFileSystemView(metaClient,
- metaClient.getActiveTimeline(),
- HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
- );
- } catch (IOException ioe) {
- throw new HoodieIOException("Error getting file system view", ioe);
- }
- }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
index d24a01e26a5..4b56d6a442c 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkAppendHandle.java
@@ -20,10 +20,7 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkers;
@@ -52,6 +49,7 @@ public class FlinkAppendHandle<T, I, K, O>
private static final Logger LOG =
LoggerFactory.getLogger(FlinkAppendHandle.class);
private boolean isClosed = false;
+ private final WriteMarkers writeMarkers;
public FlinkAppendHandle(
HoodieWriteConfig config,
@@ -62,22 +60,17 @@ public class FlinkAppendHandle<T, I, K, O>
Iterator<HoodieRecord<T>> recordItr,
TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, partitionPath, fileId, recordItr,
taskContextSupplier);
+ this.writeMarkers = WriteMarkersFactory.get(config.getMarkersType(),
hoodieTable, instantTime);
}
- protected HoodieLogFileWriteCallback getLogWriteCallback() {
- return new AppendLogWriteCallback() {
- @Override
- public boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
- // In some rare cases, the task was pulled up again with same write
file name,
- // for e.g, reuse the small log files from last commit instant.
-
- // Just skip the marker creation if it already exists, the new data
would append to
- // the file directly.
- WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
- writeMarkers.createIfNotExists(partitionPath,
logFileToAppend.getFileName(), IOType.APPEND);
- return true;
- }
- };
+ @Override
+ protected void createMarkerFile(String partitionPath, String dataFileName) {
+ // In some rare cases, the task was pulled up again with same write file
name,
+ // for e.g, reuse the small log files from last commit instant.
+
+ // Just skip the marker creation if it already exists, the new data would
append to
+ // the file directly.
+ writeMarkers.createIfNotExists(partitionPath, dataFileName, getIOType());
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
index 848b51a6850..f0f2a5e651a 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java
@@ -123,28 +123,24 @@ public class TestMergeOnReadRollbackActionExecutor
extends HoodieClientRollbackT
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata =
mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
assertEquals(2, rollbackMetadata.size());
- int deleteLogFileNum = isUsingMarkers ? 1 : 0;
for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry :
rollbackMetadata.entrySet()) {
HoodieRollbackPartitionMetadata meta = entry.getValue();
assertEquals(0, meta.getFailedDeleteFiles().size());
- assertEquals(deleteLogFileNum, meta.getSuccessDeleteFiles().size());
+ assertEquals(0, meta.getSuccessDeleteFiles().size());
}
//4. assert file group after rollback, and compare to the rollbackstat
// assert the first partition data and log file size
- int remainingLogFileNum = isUsingMarkers ? 0 : 2;
-
List<HoodieFileGroup> firstPartitionRollBack1FileGroups =
table.getFileSystemView().getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileGroups.size());
List<FileSlice> firstPartitionRollBack1FileSlices =
firstPartitionRollBack1FileGroups.get(0).getAllFileSlices().collect(Collectors.toList());
assertEquals(1, firstPartitionRollBack1FileSlices.size());
FileSlice firstPartitionRollBack1FileSlice =
firstPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> firstPartitionRollBackLogFiles =
firstPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
- assertEquals(remainingLogFileNum, firstPartitionRollBackLogFiles.size());
- if (remainingLogFileNum > 0) {
- firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
- assertEquals(1, firstPartitionRollBackLogFiles.size());
- }
+ assertEquals(2, firstPartitionRollBackLogFiles.size());
+
+ firstPartitionRollBackLogFiles.removeAll(firstPartitionCommit2LogFiles);
+ assertEquals(1, firstPartitionRollBackLogFiles.size());
// assert the second partition data and log file size
List<HoodieFileGroup> secondPartitionRollBack1FileGroups =
table.getFileSystemView().getAllFileGroups(DEFAULT_SECOND_PARTITION_PATH).collect(Collectors.toList());
@@ -153,11 +149,10 @@ public class TestMergeOnReadRollbackActionExecutor
extends HoodieClientRollbackT
assertEquals(1, secondPartitionRollBack1FileSlices.size());
FileSlice secondPartitionRollBack1FileSlice =
secondPartitionRollBack1FileSlices.get(0);
List<HoodieLogFile> secondPartitionRollBackLogFiles =
secondPartitionRollBack1FileSlice.getLogFiles().collect(Collectors.toList());
- assertEquals(remainingLogFileNum, secondPartitionRollBackLogFiles.size());
- if (remainingLogFileNum > 0) {
-
secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
- assertEquals(1, secondPartitionRollBackLogFiles.size());
- }
+ assertEquals(2, secondPartitionRollBackLogFiles.size());
+
+ secondPartitionRollBackLogFiles.removeAll(secondPartitionCommit2LogFiles);
+ assertEquals(1, secondPartitionRollBackLogFiles.size());
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
"002").doesMarkerDirExist());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 5b7ec45b226..f061c152104 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -19,38 +19,24 @@
package org.apache.hudi.table.functional;
-import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
-
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.AppendResult;
-import org.apache.hudi.common.table.log.HoodieLogFileWriteCallback;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -58,8 +44,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.marker.WriteMarkers;
-import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -75,18 +59,12 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import java.io.IOException;
-import java.net.URISyntaxException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -365,42 +343,6 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
JavaRDD<HoodieRecord> recordsRDD = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> statuses = writeClient.insert(recordsRDD,
newCommitTime);
- long expectedLogFileNum = statuses.map(writeStatus ->
(HoodieDeltaWriteStat) writeStatus.getStat())
- .flatMap(deltaWriteStat -> deltaWriteStat.getLogFiles().iterator())
- .count();
- // inject a fake log file to test marker file for log file
- HoodieDeltaWriteStat correctWriteStat = (HoodieDeltaWriteStat)
statuses.map(WriteStatus::getStat).take(1).get(0);
- assertTrue(FSUtils.isLogFile(new Path(correctWriteStat.getPath())));
- HoodieLogFile correctLogFile = new
HoodieLogFile(correctWriteStat.getPath());
- String correctWriteToken =
FSUtils.getWriteTokenFromLogPath(correctLogFile.getPath());
-
- final String fakeToken = generateFakeWriteToken(correctWriteToken);
-
- final WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(),
- HoodieSparkTable.create(config, context()), newCommitTime);
- HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
- .onParentPath(FSUtils.getPartitionPath(config.getBasePath(),
correctWriteStat.getPartitionPath()))
-
.withFileId(correctWriteStat.getFileId()).overBaseCommit(newCommitTime)
- .withLogVersion(correctLogFile.getLogVersion())
- .withFileSize(0L)
- .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs())
- .withRolloverLogWriteToken(fakeToken)
- .withLogWriteToken(fakeToken)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withLogWriteCallback(new HoodieLogFileWriteCallback() {
- @Override
- public boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
- return writeMarkers.create(correctWriteStat.getPartitionPath(),
logFileToCreate.getFileName(), IOType.CREATE).isPresent();
- }
- }).build();
- AppendResult fakeAppendResult = fakeLogWriter.appendBlock(getLogBlock());
- // check marker for fake log generated
- assertTrue(writeMarkers.allMarkerFilePaths().stream().anyMatch(marker ->
marker.contains(fakeToken)));
- SyncableFileSystemView unCommittedFsView =
getFileSystemViewWithUnCommittedSlices(metaClient);
- // check fake log generated
-
assertTrue(unCommittedFsView.getAllFileSlices(correctWriteStat.getPartitionPath())
- .flatMap(FileSlice::getLogFiles).map(HoodieLogFile::getPath)
- .anyMatch(path ->
path.getName().equals(fakeAppendResult.logFile().getPath().getName())));
writeClient.commit(newCommitTime, statuses);
HoodieTable table = HoodieSparkTable.create(config, context(),
metaClient);
@@ -417,13 +359,11 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
// check the log versions start from the base version
assertTrue(allSlices.stream().map(slice ->
slice.getLogFiles().findFirst().get().getLogVersion())
.allMatch(version ->
version.equals(HoodieLogFile.LOGFILE_BASE_VERSION)));
- // check fake log file is deleted
- assertFalse(allSlices.stream().flatMap(slice ->
slice.getLogFiles().map(HoodieLogFile::getLogWriteToken))
- .anyMatch(s -> s.equals(fakeToken)));
}
numLogFiles += logFileCount;
}
- assertEquals(expectedLogFileNum, numLogFiles);
+
+ assertTrue(numLogFiles > 0);
// Do a compaction
String instantTime =
writeClient.scheduleCompaction(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
writeClient.compact(instantTime);
@@ -434,22 +374,4 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
writeClient.commitCompaction(instantTime,
compactionMetadata.getCommitMetadata().get(), Option.empty());
}
}
-
- private HoodieDataBlock getLogBlock() throws IOException, URISyntaxException
{
- List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
- Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
- header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
- List<HoodieRecord> hoodieRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- return new HoodieAvroDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- }
-
- private String generateFakeWriteToken(String correctWriteToken) {
- Random random = new Random();
- String fakeToken = "";
- do {
- fakeToken = random.nextLong() + "-" + random.nextLong() + "-" +
random.nextLong();
- } while (fakeToken.equals(correctWriteToken));
- return fakeToken;
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index fbc88bb2718..de8c218c1a8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -41,6 +41,7 @@ import
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -989,4 +990,16 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
.build());
return cfgBuilder.build();
}
+
+ private SyncableFileSystemView
getFileSystemViewWithUnCommittedSlices(HoodieTableMetaClient metaClient) {
+ try {
+ return new HoodieTableFileSystemView(metaClient,
+ metaClient.getActiveTimeline(),
+ HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
+ );
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error getting file system view", ioe);
+ }
+ }
+
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
index 9ab88d3b52a..d806347b682 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java
@@ -47,7 +47,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
@@ -88,14 +87,11 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
@Test
public void testMarkerBasedRollbackAppend() throws Exception {
- tearDown();
- tableType = HoodieTableType.MERGE_ON_READ;
- setUp();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, IOType.CREATE);
+ .withMarkerFile("partA", f0, IOType.APPEND);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
@@ -103,29 +99,6 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
assertEquals(1, rollbackRequests.size());
}
- @ParameterizedTest
- @EnumSource(names = {"APPEND", "CREATE"})
- public void testMarkerBasedRollbackAppendWithLogFileMarkers(IOType
testIOType) throws Exception {
- tearDown();
- tableType = HoodieTableType.MERGE_ON_READ;
- setUp();
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
- String f0 = testTable.addRequestedCommit("000")
- .getFileIdWithLogFile("partA");
- testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, testIOType);
-
- HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
- List<HoodieRollbackRequest> rollbackRequests = new
MarkerBasedRollbackStrategy(hoodieTable, context, getConfig(),
- "002").getRollbackRequests(new
HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
"001"));
- assertEquals(1, rollbackRequests.size());
- HoodieRollbackRequest rollbackRequest = rollbackRequests.get(0);
- assertEquals("partA", rollbackRequest.getPartitionPath());
- assertEquals(f0, rollbackRequest.getFileId());
- assertEquals(testIOType.equals(IOType.CREATE) ? 1 : 0,
rollbackRequest.getFilesToBeDeleted().size());
- assertEquals(testIOType.equals(IOType.CREATE) ? 0 : 1,
rollbackRequest.getLogBlocksToBeDeleted().size());
- }
-
@Test
public void testCopyOnWriteRollbackWithTestTable() throws Exception {
// given: wrote some base files and corresponding markers
@@ -183,6 +156,34 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
}
}
+ @ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
+ @MethodSource("configParams")
+ public void testMergeOnReadRollback(boolean useFileListingMetadata) throws
Exception {
+ // init MERGE_ON_READ_TABLE
+ tearDown();
+ tableType = HoodieTableType.MERGE_ON_READ;
+ setUp();
+
+ HoodieWriteConfig writeConfig =
getConfigBuilder().withRollbackUsingMarkers(true).withAutoCommit(false)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(useFileListingMetadata).build())
+ .withPath(basePath).build();
+
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, writeConfig)) {
+
+ // rollback 2nd commit and ensure stats reflect the info.
+ List<HoodieRollbackStat> stats =
testUpdateAndRollback(useFileListingMetadata, writeConfig, writeClient);
+
+ assertEquals(3, stats.size());
+ for (HoodieRollbackStat stat : stats) {
+ assertEquals(0, stat.getSuccessDeleteFiles().size());
+ assertEquals(0, stat.getFailedDeleteFiles().size());
+ assertEquals(1, stat.getCommandBlocksCount().size());
+ stat.getCommandBlocksCount().forEach((fileStatus, len) ->
assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())));
+ }
+ }
+ }
+
@ParameterizedTest(name = TEST_NAME_WITH_PARAMS)
@MethodSource("configParams")
public void testMergeOnReadRollbackDeletesFirstAppendFiles(boolean
useFileListingMetadata) throws Exception {
@@ -262,7 +263,7 @@ public class TestMarkerBasedRollbackStrategy extends
HoodieClientTestBase {
String f0 = testTable.addRequestedCommit("000")
.getFileIdsWithBaseFilesInPartitions("partA").get("partA");
testTable.forCommit("001")
- .withLogMarkerFile("partA", f0, IOType.APPEND);
+ .withMarkerFile("partA", f0, IOType.APPEND);
HoodieTable hoodieTable = HoodieSparkTable.create(getConfig(), context,
metaClient);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 7d8416c368a..d76db5d5966 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -22,13 +22,11 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -201,7 +199,6 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
Pair<List<HoodieRecord>, List<HoodieRecord>> inputRecords =
twoUpsertCommitDataWithTwoPartitions(firstPartitionCommit2FileSlices,
secondPartitionCommit2FileSlices, cfg, client, false);
HoodieTable table = this.getHoodieTable(metaClient, cfg);
- prepForUpgradeFromZeroToOne(table);
HoodieInstant commitInstant =
table.getPendingCommitTimeline().lastInstant().get();
// delete one of the marker files in 2nd commit if need be.
@@ -851,48 +848,6 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
return Pair.of(records, records2);
}
- /**
- * Since how markers are generated for log file changed in Version Six, we
regenerate markers in the way version zero do.
- *
- * @param table instance of {@link HoodieTable}
- */
- private void prepForUpgradeFromZeroToOne(HoodieTable table) throws
IOException {
- List<HoodieInstant> instantsToBeParsed =
- metaClient.getActiveTimeline()
- .getCommitsTimeline()
- .getInstantsAsStream()
- .collect(Collectors.toList());
- for (HoodieInstant instant : instantsToBeParsed) {
- WriteMarkers writeMarkers =
- WriteMarkersFactory.get(table.getConfig().getMarkersType(), table,
instant.getTimestamp());
- Set<String> oldMarkers = writeMarkers.allMarkerFilePaths();
- boolean hasAppendMarker = oldMarkers.stream().anyMatch(marker ->
marker.contains(IOType.APPEND.name())
- || marker.contains(IOType.CREATE.name()));
- if (hasAppendMarker) {
- // delete all markers and regenerate
- writeMarkers.deleteMarkerDir(table.getContext(), 2);
- for (String oldMarker : oldMarkers) {
- String typeStr = oldMarker.substring(oldMarker.lastIndexOf(".") + 1);
- IOType type = IOType.valueOf(typeStr);
- String partitionFilePath = WriteMarkers.stripMarkerSuffix(oldMarker);
- Path fullFilePath = new Path(basePath, partitionFilePath);
- String partitionPath = FSUtils.getRelativePartitionPath(new
Path(basePath), fullFilePath.getParent());
- if (FSUtils.isBaseFile(fullFilePath)) {
- writeMarkers.create(partitionPath, fullFilePath.getName(), type);
- } else {
- String fileId = FSUtils.getFileIdFromFilePath(fullFilePath);
- String baseInstant =
FSUtils.getBaseCommitTimeFromLogPath(fullFilePath);
- String writeToken = FSUtils.getWriteTokenFromLogPath(fullFilePath);
- writeMarkers.create(partitionPath,
- FSUtils.makeBaseFileName(baseInstant, writeToken, fileId,
table.getBaseFileFormat().getFileExtension()), type);
- }
- }
- writeMarkers.allMarkerFilePaths()
- .forEach(markerPath ->
assertFalse(markerPath.contains(HoodieLogFile.DELTA_EXTENSION)));
- }
- }
- }
-
private void prepForDowngradeFromVersion(HoodieTableVersion fromVersion)
throws IOException {
metaClient.getTableConfig().setTableVersion(fromVersion);
Path propertyFile = new Path(metaClient.getMetaPath() + "/" +
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
deleted file mode 100644
index 652c013cc3e..00000000000
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileWriteCallback.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import org.apache.hudi.common.model.HoodieLogFile;
-
-/**
- * HoodieLogFileWriteCallback is trigger when specific log file operation
happen
- */
-public interface HoodieLogFileWriteCallback {
- default boolean preLogFileOpen(HoodieLogFile logFileToAppend) {
- return true;
- }
-
- default boolean preLogFileCreate(HoodieLogFile logFileToCreate) {
- return true;
- }
-
- default boolean preLogFileClose(HoodieLogFile logFileToClose) {
- return true;
- }
-
- default boolean postLogFileClose(HoodieLogFile logFileToClose) {
- return true;
- }
-}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 5e7d0806fae..d77be9a281b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -145,8 +145,6 @@ public interface HoodieLogFormat {
private String suffix;
// Rollover Log file write token
private String rolloverLogWriteToken;
- // A call back triggered with log file operation
- private HoodieLogFileWriteCallback logFileWriteCallback;
public WriterBuilder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
@@ -203,11 +201,6 @@ public interface HoodieLogFormat {
return this;
}
- public WriterBuilder withLogWriteCallback(HoodieLogFileWriteCallback
logFileWriteCallback) {
- this.logFileWriteCallback = logFileWriteCallback;
- return this;
- }
-
public WriterBuilder withFileSize(long fileLen) {
this.fileLen = fileLen;
return this;
@@ -240,11 +233,6 @@ public interface HoodieLogFormat {
rolloverLogWriteToken = UNKNOWN_WRITE_TOKEN;
}
- if (logFileWriteCallback == null) {
- // use a callback do nothing here as default callback.
- logFileWriteCallback = new HoodieLogFileWriteCallback() {};
- }
-
if (logVersion == null) {
LOG.info("Computing the next log version for " + logFileId + " in " +
parentPath);
Option<Pair<Integer, String>> versionAndWriteToken =
@@ -291,8 +279,7 @@ public interface HoodieLogFormat {
if (sizeThreshold == null) {
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
}
- return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication,
sizeThreshold,
- rolloverLogWriteToken, logFileWriteCallback);
+ return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication,
sizeThreshold, rolloverLogWriteToken);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 07cbfef8fe3..081c18e8f65 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -55,21 +55,18 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private final Integer bufferSize;
private final Short replication;
private final String rolloverLogWriteToken;
- final HoodieLogFileWriteCallback logFileWriteCallback;
private boolean closed = false;
private transient Thread shutdownThread = null;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not
sufficiently replicated yet";
- HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer
bufferSize, Short replication, Long sizeThreshold,
- String rolloverLogWriteToken,
HoodieLogFileWriteCallback logFileWriteCallback) {
+ HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer
bufferSize, Short replication, Long sizeThreshold, String
rolloverLogWriteToken) {
this.fs = fs;
this.logFile = logFile;
this.sizeThreshold = sizeThreshold;
this.bufferSize = bufferSize;
this.replication = replication;
this.rolloverLogWriteToken = rolloverLogWriteToken;
- this.logFileWriteCallback = logFileWriteCallback;
addShutDownHook();
}
@@ -97,9 +94,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
Path path = logFile.getPath();
if (fs.exists(path)) {
boolean isAppendSupported =
StorageSchemes.isAppendSupported(fs.getScheme());
- // here we use marker file to fence concurrent append to the same
file. So it is safe to use speculation in spark now.
- boolean callbackSuccess = logFileWriteCallback.preLogFileOpen(logFile);
- if (isAppendSupported && callbackSuccess) {
+ if (isAppendSupported) {
LOG.info(logFile + " exists. Appending to existing file");
try {
// open the path for append and record the offset
@@ -121,11 +116,10 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
}
}
}
- if (!isAppendSupported || !callbackSuccess) {
+ if (!isAppendSupported) {
rollOver();
createNewFile();
- String rolloverReason = isAppendSupported ? "Append not supported" :
"Callback failed";
- LOG.info(rolloverReason + ". Rolling over to " + logFile);
+ LOG.info("Append not supported.. Rolling over to " + logFile);
}
} else {
LOG.info(logFile + " does not exist. Create a new file");
@@ -236,7 +230,6 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
}
private void createNewFile() throws IOException {
- logFileWriteCallback.preLogFileCreate(logFile);
this.output =
fs.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
}
@@ -246,12 +239,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
if (null != shutdownThread) {
Runtime.getRuntime().removeShutdownHook(shutdownThread);
}
- logFileWriteCallback.preLogFileClose(logFile);
- try {
- closeStream();
- } finally {
- logFileWriteCallback.postLogFileClose(logFile);
- }
+ closeStream();
}
private void closeStream() throws IOException {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 469b5c8bf39..4ace66779ec 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -30,7 +30,6 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.IOType;
@@ -41,7 +40,6 @@ import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.fs.FileSystem;
@@ -98,20 +96,17 @@ public class FileCreateUtils {
return FSUtils.makeLogFileName(fileId, fileExtension, instantTime,
version, WRITE_TOKEN);
}
- public static String markerFileName(String fileName, IOType ioType) {
- return String.format("%s%s.%s", fileName,
HoodieTableMetaClient.MARKER_EXTN, ioType.name());
+ public static String markerFileName(String instantTime, String fileId,
IOType ioType) {
+ return markerFileName(instantTime, fileId, ioType, BASE_FILE_EXTENSION);
}
- public static String dataFileMarkerFileName(String instantTime, String
fileId, IOType ioType, String fileExtension, String writeToken) {
- return markerFileName(FSUtils.makeBaseFileName(instantTime, writeToken,
fileId, fileExtension), ioType);
+ public static String markerFileName(String instantTime, String fileId,
IOType ioType, String fileExtension) {
+ return markerFileName(instantTime, fileId, ioType, fileExtension,
WRITE_TOKEN);
}
- public static String logFileMarkerFileName(String instantTime, String
fileId, IOType ioType, int logVersion) {
- return logFileMarkerFileName(instantTime, fileId, ioType,
HoodieLogFile.DELTA_EXTENSION, logVersion);
- }
-
- public static String logFileMarkerFileName(String instantTime, String
fileId, IOType ioType, String fileExtension, int logVersion) {
- return markerFileName(FSUtils.makeLogFileName(fileId, fileExtension,
instantTime, logVersion, WRITE_TOKEN), ioType);
+ public static String markerFileName(String instantTime, String fileId,
IOType ioType, String fileExtension, String writeToken) {
+ return String.format("%s_%s_%s%s%s.%s", fileId, writeToken, instantTime,
fileExtension,
+ HoodieTableMetaClient.MARKER_EXTN, ioType);
}
private static void createMetaFile(String basePath, String instantTime,
String suffix, FileSystem fs) throws IOException {
@@ -373,38 +368,9 @@ public class FileCreateUtils {
public static String createMarkerFile(String basePath, String partitionPath,
String commitInstant,
String instantTime, String fileId, IOType ioType, String writeToken)
throws IOException {
- // with HUDI-1517, this method is used for creating marker files for data
files. So append type is not allowed
- ValidationUtils.checkArgument(ioType != IOType.APPEND);
- Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
- Files.createDirectories(parentPath);
- Path markerFilePath =
parentPath.resolve(dataFileMarkerFileName(instantTime, fileId, ioType,
BASE_FILE_EXTENSION, writeToken));
- if (Files.notExists(markerFilePath)) {
- Files.createFile(markerFilePath);
- }
- return markerFilePath.toAbsolutePath().toString();
- }
-
- public static String createLogFileMarker(String basePath, String
partitionPath, String instantTime, String fileId, IOType ioType)
- throws IOException {
- return createLogFileMarker(basePath, partitionPath, instantTime, fileId,
ioType, HoodieLogFile.LOGFILE_BASE_VERSION);
- }
-
- public static String createLogFileMarker(String basePath, String
partitionPath, String instantTime, String fileId, IOType ioType, int logVersion)
- throws IOException {
- Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
- Files.createDirectories(parentPath);
- Path markerFilePath =
parentPath.resolve(logFileMarkerFileName(instantTime, fileId, ioType,
logVersion));
- if (Files.notExists(markerFilePath)) {
- Files.createFile(markerFilePath);
- }
- return markerFilePath.toAbsolutePath().toString();
- }
-
- public static String createFileMarkerByFileName(String basePath, String
partitionPath, String instantTime, String fileName, IOType ioType)
- throws IOException {
- Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath);
+ Path parentPath = Paths.get(basePath,
HoodieTableMetaClient.TEMPFOLDER_NAME, commitInstant, partitionPath);
Files.createDirectories(parentPath);
- Path markerFilePath = parentPath.resolve(markerFileName(fileName, ioType));
+ Path markerFilePath = parentPath.resolve(markerFileName(instantTime,
fileId, ioType, BASE_FILE_EXTENSION, writeToken));
if (Files.notExists(markerFilePath)) {
Files.createFile(markerFilePath);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 49095683a2b..b1dfa366dd8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -66,6 +66,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
@@ -105,7 +107,6 @@ import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightDel
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightRollbackFile;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createInflightSavepoint;
-import static
org.apache.hudi.common.testutils.FileCreateUtils.createLogFileMarker;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
import static
org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
@@ -132,6 +133,7 @@ public class HoodieTestTable {
public static final String PHONY_TABLE_SCHEMA =
"{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\",
\"name\": \"PhonyRecord\", \"fields\": []}";
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieTestTable.class);
private static final Random RANDOM = new Random();
protected static HoodieTestTableState testTableState;
@@ -596,11 +598,6 @@ public class HoodieTestTable {
return this;
}
- public HoodieTestTable withLogMarkerFile(String partitionPath, String
fileId, IOType ioType) throws IOException {
- createLogFileMarker(basePath, partitionPath, currentInstantTime, fileId,
ioType);
- return this;
- }
-
/**
* Insert one base file to each of the given distinct partitions.
*
@@ -779,7 +776,6 @@ public class HoodieTestTable {
public FileStatus[] listAllLogFiles(String fileExtension) throws IOException
{
return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
- .filter(status ->
!status.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))
.filter(status -> status.getPath().getName().contains(fileExtension))
.toArray(FileStatus[]::new);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
index fbe16e3e52f..ecd31cd719e 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestBucketStreamWrite.java
@@ -133,13 +133,17 @@ public class ITTestBucketStreamWrite {
Path path = new Path(metaClient.getMetaPath() + Path.SEPARATOR + filename);
fs.delete(path);
+ // marker types are different for COW and MOR
+ IOType ioType = isCow ? IOType.CREATE : IOType.APPEND;
+
commitMetadata.getFileIdAndRelativePaths().forEach((fileId, relativePath)
-> {
// hacky way to reconstruct markers ¯\_(ツ)_/¯
String[] partitionFileNameSplit = relativePath.split("/");
+ String fileInstant = FSUtils.getCommitTime(partitionFileNameSplit[1]);
String partition = partitionFileNameSplit[0];
- String fileName = partitionFileNameSplit[1];
+ String writeToken = isCow ? getWriteToken(partitionFileNameSplit[1]) :
FSUtils.getWriteTokenFromLogPath(new Path(relativePath));
try {
- FileCreateUtils.createFileMarkerByFileName(tablePath, partition,
commitInstant, fileName, IOType.CREATE);
+ FileCreateUtils.createMarkerFile(tablePath, partition, commitInstant,
fileInstant, fileId, ioType, writeToken);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
index fe4c3bda3d6..5b90e266819 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCallProcedure.scala
@@ -196,7 +196,7 @@ class TestCallProcedure extends
HoodieSparkProcedureTestBase {
s"Argument: instant_time is required")
val instantTime = "101"
- FileCreateUtils.createLogFileMarker(tablePath, "", instantTime, "f0",
IOType.APPEND)
+ FileCreateUtils.createMarkerFile(tablePath, "", instantTime, "f0",
IOType.APPEND)
assertResult(1) {
FileCreateUtils.getTotalMarkerFileCount(tablePath, "", instantTime,
IOType.APPEND)
}