This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2567ada6d5 Revert "[HUDI-5409] Avoid file index and use fs view cache
in COW input format (#7493)" (#7526)
2567ada6d5 is described below
commit 2567ada6d5654bf8463cb55e25f5d662aa5a8475
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Dec 24 09:06:49 2022 +0530
Revert "[HUDI-5409] Avoid file index and use fs view cache in COW input
format (#7493)" (#7526)
This reverts commit cc1c1e7b33d9c95e5a2ba0e9a1db428d1e1b2a00.
---
.../hudi/execution/TestDisruptorMessageQueue.java | 4 +-
.../hadoop/HoodieCopyOnWriteTableInputFormat.java | 144 +++++++--------------
.../HoodieMergeOnReadTableInputFormat.java | 30 ++---
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 2 +-
4 files changed, 61 insertions(+), 119 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
index 7d324e5296..76c22f96e7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -39,7 +39,6 @@ import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Tuple2;
@@ -86,11 +85,10 @@ public class TestDisruptorMessageQueue extends
HoodieClientTestHarness {
// Test to ensure that we are reading all records from queue iterator in the
same order
// without any exceptions.
- @Disabled("Disabled for unblocking 0.12.2 release. Disruptor queue is not
part of this minor release. Tracked in HUDI-5410")
@SuppressWarnings("unchecked")
@Test
@Timeout(value = 60)
- public void testRecordReading() {
+ public void testRecordReading() throws Exception {
final List<HoodieRecord> hoodieRecords =
dataGen.generateInserts(instantTime, 100);
ArrayList<HoodieRecord> beforeRecord = new ArrayList<>();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index ce441bf2e2..140e7ff5b6 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -18,9 +18,21 @@
package org.apache.hudi.hadoop;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -30,8 +42,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.FileSystemViewManager;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
@@ -39,42 +50,21 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
-import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
-import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
-import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.buildMetadataConfig;
-import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getFileStatus;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* Base implementation of the Hive's {@link FileInputFormat} allowing for
reading of Hudi's
@@ -200,7 +190,7 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
- protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
Option<HoodieInstant> instantOpt, String basePath, Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
+ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
if (baseFileOpt.isPresent()) {
@@ -233,7 +223,6 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
Map<HoodieTableMetaClient, List<Path>> groupedPaths =
HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(),
snapshotPaths);
- Map<HoodieTableMetaClient, HoodieTableFileSystemView> fsViewCache = new
HashMap<>();
for (Map.Entry<HoodieTableMetaClient, List<Path>> entry :
groupedPaths.entrySet()) {
HoodieTableMetaClient tableMetaClient = entry.getKey();
@@ -247,83 +236,33 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
boolean shouldIncludePendingCommits =
HoodieHiveUtils.shouldIncludePendingCommits(job,
tableMetaClient.getTableConfig().getTableName());
- // NOTE: Fetching virtual key info is a costly operation as it needs to
load the commit metadata.
- // This is only needed for MOR realtime splits. Hence, for COW
tables, this can be avoided.
- Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt =
tableMetaClient.getTableType().equals(COPY_ON_WRITE) ? Option.empty() :
getHoodieVirtualKeyInfo(tableMetaClient);
- String basePath = tableMetaClient.getBasePathV2().toString();
-
- if (conf.getBoolean(ENABLE.key(), DEFAULT_METADATA_ENABLE_FOR_READERS)
&& HoodieTableMetadataUtil.isFilesPartitionAvailable(tableMetaClient)) {
- HiveHoodieTableFileIndex fileIndex =
- new HiveHoodieTableFileIndex(
- engineContext,
- tableMetaClient,
- props,
- HoodieTableQueryType.SNAPSHOT,
- partitionPaths,
- queryCommitInstant,
- shouldIncludePendingCommits);
-
- Map<String, List<FileSlice>> partitionedFileSlices =
fileIndex.listFileSlices();
-
- targetFiles.addAll(
- partitionedFileSlices.values()
- .stream()
- .flatMap(Collection::stream)
- .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
- .map(fileSlice -> createFileStatusUnchecked(fileSlice,
fileIndex.getLatestCompletedInstant(), basePath, virtualKeyInfoOpt))
- .collect(Collectors.toList())
- );
- } else {
- HoodieTimeline timeline = getActiveTimeline(tableMetaClient,
shouldIncludePendingCommits);
- Option<String> queryInstant = queryCommitInstant.or(() ->
timeline.lastInstant().map(HoodieInstant::getTimestamp));
- validateInstant(timeline, queryInstant);
-
- try {
- HoodieTableFileSystemView fsView =
fsViewCache.computeIfAbsent(tableMetaClient, hoodieTableMetaClient ->
-
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
hoodieTableMetaClient, buildMetadataConfig(job), timeline));
-
- List<FileSlice> filteredFileSlices = new ArrayList<>();
-
- for (Path p : entry.getValue()) {
- String relativePartitionPath =
FSUtils.getRelativePartitionPath(new Path(basePath), p);
-
- List<FileSlice> fileSlices = queryInstant.map(
- instant ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, instant))
- .orElse(fsView.getLatestFileSlices(relativePartitionPath))
- .collect(Collectors.toList());
-
- filteredFileSlices.addAll(fileSlices);
- }
-
- targetFiles.addAll(
- filteredFileSlices.stream()
- .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
- .map(fileSlice -> createFileStatusUnchecked(fileSlice,
timeline.filterCompletedInstants().lastInstant(), basePath, virtualKeyInfoOpt))
- .collect(Collectors.toList()));
- } finally {
- fsViewCache.forEach(((metaClient, fsView) -> fsView.close()));
- }
- }
+ HiveHoodieTableFileIndex fileIndex =
+ new HiveHoodieTableFileIndex(
+ engineContext,
+ tableMetaClient,
+ props,
+ HoodieTableQueryType.SNAPSHOT,
+ partitionPaths,
+ queryCommitInstant,
+ shouldIncludePendingCommits);
+
+ Map<String, List<FileSlice>> partitionedFileSlices =
fileIndex.listFileSlices();
+
+ Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt =
getHoodieVirtualKeyInfo(tableMetaClient);
+
+ targetFiles.addAll(
+ partitionedFileSlices.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
+ .map(fileSlice -> createFileStatusUnchecked(fileSlice,
fileIndex, virtualKeyInfoOpt))
+ .collect(Collectors.toList())
+ );
}
return targetFiles;
}
- private static HoodieTimeline getActiveTimeline(HoodieTableMetaClient
metaClient, boolean shouldIncludePendingCommits) {
- HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
- if (shouldIncludePendingCommits) {
- return timeline;
- } else {
- return timeline.filterCompletedAndCompactionInstants();
- }
- }
-
- private static void validateInstant(HoodieTimeline activeTimeline,
Option<String> queryInstant) {
- if (queryInstant.isPresent() &&
!activeTimeline.containsInstant(queryInstant.get())) {
- throw new HoodieIOException(String.format("Query instant (%s) not found
in the timeline", queryInstant.get()));
- }
- }
-
protected boolean checkIfValidFileSlice(FileSlice fileSlice) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
@@ -338,10 +277,15 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
}
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
@Nonnull
protected static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
try {
- return getFileStatus(baseFile);
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to get file-status", ioe);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 6a198f9ad3..95a1a74b65 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -18,6 +18,16 @@
package org.apache.hudi.hadoop.realtime;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -33,23 +43,13 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
+import org.apache.hudi.hadoop.HiveHoodieTableFileIndex;
import org.apache.hudi.hadoop.HoodieCopyOnWriteTableInputFormat;
import org.apache.hudi.hadoop.LocatedFileStatusWithBootstrapBaseFile;
import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapreduce.Job;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,14 +86,14 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
}
@Override
- protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
- Option<HoodieInstant>
latestCompletedInstantOpt,
- String tableBasePath,
- Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
+ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+ Option<HoodieInstant> latestCompletedInstantOpt =
fileIndex.getLatestCompletedInstant();
+ String tableBasePath = fileIndex.getBasePath().toString();
+
// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles,
tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index eeeedc061e..f9c2c9ca29 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -449,7 +449,7 @@ public class HoodieInputFormatUtils {
* @param dataFile
* @return
*/
- public static HoodieBaseFile refreshFileStatus(Configuration conf,
HoodieBaseFile dataFile) {
+ private static HoodieBaseFile refreshFileStatus(Configuration conf,
HoodieBaseFile dataFile) {
Path dataPath = dataFile.getFileStatus().getPath();
try {
if (dataFile.getFileSize() == 0) {