This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6cd17aaad8 [HUDI-5336] Fixing parsing of log files while building file
groups (#7393)
6cd17aaad8 is described below
commit 6cd17aaad82eb9487cf4d52d2d2e17e9da00f221
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Dec 11 18:44:22 2022 -0800
[HUDI-5336] Fixing parsing of log files while building file groups (#7393)
---
.../java/org/apache/hudi/common/fs/FSUtils.java | 5 +--
.../apache/hudi/common/model/HoodieLogFile.java | 1 +
.../table/view/AbstractTableFileSystemView.java | 8 +++-
.../table/view/TestHoodieTableFileSystemView.java | 46 ++++++++++++++++++++++
4 files changed, 55 insertions(+), 5 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index b7ab3e6cf9..1c05270765 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -79,9 +79,8 @@ public class FSUtils {
private static final Logger LOG = LogManager.getLogger(FSUtils.class);
// Log files are of this pattern -
.b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1_1-0-1
// Archive log files are of this pattern - .commits_.archive.1_1-0-1
- private static final Pattern LOG_FILE_PATTERN =
+ public static final Pattern LOG_FILE_PATTERN =
Pattern.compile("\\.(.+)_(.*)\\.(.+)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(.cdc)?)?");
- private static final String LOG_FILE_PREFIX = ".";
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final long MIN_CLEAN_TO_KEEP = 10;
private static final long MIN_ROLLBACK_TO_KEEP = 10;
@@ -467,7 +466,7 @@ public class FSUtils {
String suffix = (writeToken == null)
? String.format("%s_%s%s.%d", fileId, baseCommitTime,
logFileExtension, version)
: String.format("%s_%s%s.%d_%s", fileId, baseCommitTime,
logFileExtension, version, writeToken);
- return LOG_FILE_PREFIX + suffix;
+ return HoodieLogFile.LOG_FILE_PREFIX + suffix;
}
public static boolean isBaseFile(Path path) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
index d4be8b5068..ba8b5e813e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
@@ -39,6 +39,7 @@ public class HoodieLogFile implements Serializable {
private static final long serialVersionUID = 1L;
public static final String DELTA_EXTENSION = ".log";
+ public static final String LOG_FILE_PREFIX = ".";
public static final Integer LOGFILE_BASE_VERSION = 1;
private static final Comparator<HoodieLogFile> LOG_FILE_COMPARATOR = new
LogFileComparator();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 6e456e8305..de1b39dccf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -61,6 +61,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.function.Predicate;
+import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -366,8 +367,11 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
* @param statuses List of FIle-Status
*/
private Stream<HoodieLogFile> convertFileStatusesToLogFiles(FileStatus[]
statuses) {
- Predicate<FileStatus> rtFilePredicate = fileStatus ->
fileStatus.getPath().getName()
-
.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension());
+ Predicate<FileStatus> rtFilePredicate = fileStatus -> {
+ String fileName = fileStatus.getPath().getName();
+ Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(fileName);
+ return matcher.find() &&
fileName.contains(metaClient.getTableConfig().getLogFileFormat().getFileExtension());
+ };
return
Arrays.stream(statuses).filter(rtFilePredicate).map(HoodieLogFile::new);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 180b601628..9e14611f80 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -292,6 +292,52 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
testViewForFileSlicesWithAsyncCompaction(false, true, 2, 2, true,
testBootstrap);
}
+ @Test
+ protected void testInvalidLogFiles() throws Exception {
+ String partitionPath = "2016/05/01";
+ Paths.get(basePath, partitionPath).toFile().mkdirs();
+ String fileId = UUID.randomUUID().toString();
+
+ String instantTime1 = "1";
+ String deltaInstantTime1 = "2";
+ String deltaInstantTime2 = "3";
+ String fileName1 =
+ FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
instantTime1, 0, TEST_WRITE_TOKEN);
+ String fileName2 =
+ FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
instantTime1, 1, TEST_WRITE_TOKEN);
+ // create a dummy log file mimicing cloud stores marker files
+ String fileName3 = "_DUMMY_" + fileName1.substring(1, fileName1.length());
+ // this file should not be deduced as a log file.
+
+ Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile();
+ Paths.get(basePath, partitionPath, fileName2).toFile().createNewFile();
+ Paths.get(basePath, partitionPath, fileName3).toFile().createNewFile();
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+ HoodieInstant deltaInstant2 = new HoodieInstant(true,
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
+ HoodieInstant deltaInstant3 = new HoodieInstant(true,
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
+
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ saveAsComplete(commitTimeline, deltaInstant2, Option.empty());
+ saveAsComplete(commitTimeline, deltaInstant3, Option.empty());
+
+ refreshFsView();
+
+ List<HoodieBaseFile> dataFiles =
roView.getLatestBaseFiles().collect(Collectors.toList());
+ assertTrue(dataFiles.isEmpty(), "No data file expected");
+ List<FileSlice> fileSliceList =
rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
+ assertEquals(1, fileSliceList.size());
+ FileSlice fileSlice = fileSliceList.get(0);
+ assertEquals(fileId, fileSlice.getFileId(), "File-Id must be set
correctly");
+ assertFalse(fileSlice.getBaseFile().isPresent(), "Data file for base
instant must be present");
+ assertEquals(instantTime1, fileSlice.getBaseInstantTime(), "Base Instant
for file-group set correctly");
+ List<HoodieLogFile> logFiles =
fileSlice.getLogFiles().collect(Collectors.toList());
+ assertEquals(2, logFiles.size(), "Correct number of log-files shows up in
file-slice");
+ assertEquals(fileName2, logFiles.get(0).getFileName(), "Log File Order
check");
+ assertEquals(fileName1, logFiles.get(1).getFileName(), "Log File Order
check");
+ }
+
/**
* Returns all file-slices including uncommitted ones.
*