This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a882f440d3 [HUDI-5411] Avoid virtual key info for COW table in the
input format (#7527)
a882f440d3 is described below
commit a882f440d37b4adb0ff194dad579c11dc44bbc78
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Dec 24 14:48:08 2022 +0530
[HUDI-5411] Avoid virtual key info for COW table in the input format (#7527)
Push virtual key fetch inside createFileStatusUnchecked for MOR input format
---
.../hadoop/HoodieCopyOnWriteTableInputFormat.java | 60 ++++++----------------
.../HoodieMergeOnReadTableInputFormat.java | 52 ++++++++++++++-----
2 files changed, 55 insertions(+), 57 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index 140e7ff5b6..ed9e16c8d0 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -18,42 +18,38 @@
package org.apache.hudi.hadoop;
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
@@ -190,7 +186,7 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
}
- protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
+ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
if (baseFileOpt.isPresent()) {
@@ -215,7 +211,7 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
@Nonnull
private List<FileStatus> listStatusForSnapshotMode(JobConf job,
Map<String,
HoodieTableMetaClient> tableMetaClientMap,
- List<Path> snapshotPaths)
throws IOException {
+ List<Path> snapshotPaths)
{
HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
List<FileStatus> targetFiles = new ArrayList<>();
@@ -248,14 +244,12 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
Map<String, List<FileSlice>> partitionedFileSlices =
fileIndex.listFileSlices();
- Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt =
getHoodieVirtualKeyInfo(tableMetaClient);
-
targetFiles.addAll(
partitionedFileSlices.values()
.stream()
.flatMap(Collection::stream)
.filter(fileSlice -> checkIfValidFileSlice(fileSlice))
- .map(fileSlice -> createFileStatusUnchecked(fileSlice,
fileIndex, virtualKeyInfoOpt))
+ .map(fileSlice -> createFileStatusUnchecked(fileSlice,
fileIndex, tableMetaClient))
.collect(Collectors.toList())
);
}
@@ -290,24 +284,4 @@ public class HoodieCopyOnWriteTableInputFormat extends
HoodieTableInputFormat {
throw new HoodieIOException("Failed to get file-status", ioe);
}
}
-
- protected static Option<HoodieVirtualKeyInfo>
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
- HoodieTableConfig tableConfig = metaClient.getTableConfig();
- if (tableConfig.populateMetaFields()) {
- return Option.empty();
- }
- TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
- try {
- Schema schema = tableSchemaResolver.getTableAvroSchema();
- boolean isNonPartitionedKeyGen =
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
- return Option.of(
- new HoodieVirtualKeyInfo(
- tableConfig.getRecordKeyFieldProp(),
- isNonPartitionedKeyGen ? Option.empty() :
Option.of(tableConfig.getPartitionFieldProp()),
- schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
- isNonPartitionedKeyGen ? Option.empty() :
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
- } catch (Exception exception) {
- throw new HoodieException("Fetching table schema failed with exception
", exception);
- }
- }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 95a1a74b65..0145484017 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -18,27 +18,20 @@
package org.apache.hudi.hadoop.realtime;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
@@ -50,6 +43,18 @@ import org.apache.hudi.hadoop.RealtimeFileStatus;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.Job;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,7 +91,7 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
}
@Override
- protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo>
virtualKeyInfoOpt) {
+ protected FileStatus createFileStatusUnchecked(FileSlice fileSlice,
HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
@@ -96,9 +101,9 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
// Check if we're reading a MOR table
if (baseFileOpt.isPresent()) {
- return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles,
tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
+ return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles,
tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
} else if (latestLogFileOpt.isPresent()) {
- return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(),
logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
+ return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(),
logFiles, tableBasePath, latestCompletedInstantOpt,
getHoodieVirtualKeyInfo(metaClient));
} else {
throw new IllegalStateException("Invalid state: either base-file or
log-file has to be present");
}
@@ -384,5 +389,24 @@ public class HoodieMergeOnReadTableInputFormat extends
HoodieCopyOnWriteTableInp
throw new HoodieIOException(String.format("Failed to init %s",
RealtimeFileStatus.class.getSimpleName()), e);
}
}
-}
+ private static Option<HoodieVirtualKeyInfo>
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+ HoodieTableConfig tableConfig = metaClient.getTableConfig();
+ if (tableConfig.populateMetaFields()) {
+ return Option.empty();
+ }
+ TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(metaClient);
+ try {
+ Schema schema = tableSchemaResolver.getTableAvroSchema();
+ boolean isNonPartitionedKeyGen =
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
+ return Option.of(
+ new HoodieVirtualKeyInfo(
+ tableConfig.getRecordKeyFieldProp(),
+ isNonPartitionedKeyGen ? Option.empty() :
Option.of(tableConfig.getPartitionFieldProp()),
+ schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
+ isNonPartitionedKeyGen ? Option.empty() :
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
+ } catch (Exception exception) {
+ throw new HoodieException("Fetching table schema failed with exception
", exception);
+ }
+ }
+}