This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a882f440d3 [HUDI-5411] Avoid virtual key info for COW table in the 
input format (#7527)
a882f440d3 is described below

commit a882f440d37b4adb0ff194dad579c11dc44bbc78
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Dec 24 14:48:08 2022 +0530

    [HUDI-5411] Avoid virtual key info for COW table in the input format (#7527)
    
    Push virtual key fetch inside createFileStatusUnchecked for MOR input format
---
 .../hadoop/HoodieCopyOnWriteTableInputFormat.java  | 60 ++++++----------------
 .../HoodieMergeOnReadTableInputFormat.java         | 52 ++++++++++++++-----
 2 files changed, 55 insertions(+), 57 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
index 140e7ff5b6..ed9e16c8d0 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java
@@ -18,42 +18,38 @@
 
 package org.apache.hudi.hadoop;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableQueryType;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo;
 import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
@@ -190,7 +186,7 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
     return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext, 
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
   }
 
-  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
+  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
     Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
 
     if (baseFileOpt.isPresent()) {
@@ -215,7 +211,7 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
   @Nonnull
   private List<FileStatus> listStatusForSnapshotMode(JobConf job,
                                                      Map<String, 
HoodieTableMetaClient> tableMetaClientMap,
-                                                     List<Path> snapshotPaths) 
throws IOException {
+                                                     List<Path> snapshotPaths) 
{
     HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
     List<FileStatus> targetFiles = new ArrayList<>();
 
@@ -248,14 +244,12 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
 
       Map<String, List<FileSlice>> partitionedFileSlices = 
fileIndex.listFileSlices();
 
-      Option<HoodieVirtualKeyInfo> virtualKeyInfoOpt = 
getHoodieVirtualKeyInfo(tableMetaClient);
-
       targetFiles.addAll(
           partitionedFileSlices.values()
               .stream()
               .flatMap(Collection::stream)
               .filter(fileSlice -> checkIfValidFileSlice(fileSlice))
-              .map(fileSlice -> createFileStatusUnchecked(fileSlice, 
fileIndex, virtualKeyInfoOpt))
+              .map(fileSlice -> createFileStatusUnchecked(fileSlice, 
fileIndex, tableMetaClient))
               .collect(Collectors.toList())
       );
     }
@@ -290,24 +284,4 @@ public class HoodieCopyOnWriteTableInputFormat extends 
HoodieTableInputFormat {
       throw new HoodieIOException("Failed to get file-status", ioe);
     }
   }
-
-  protected static Option<HoodieVirtualKeyInfo> 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
-    HoodieTableConfig tableConfig = metaClient.getTableConfig();
-    if (tableConfig.populateMetaFields()) {
-      return Option.empty();
-    }
-    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-    try {
-      Schema schema = tableSchemaResolver.getTableAvroSchema();
-      boolean isNonPartitionedKeyGen = 
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
-      return Option.of(
-          new HoodieVirtualKeyInfo(
-              tableConfig.getRecordKeyFieldProp(),
-              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(tableConfig.getPartitionFieldProp()),
-              schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
-              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
-    } catch (Exception exception) {
-      throw new HoodieException("Fetching table schema failed with exception 
", exception);
-    }
-  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 95a1a74b65..0145484017 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -18,27 +18,20 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.BootstrapBaseFileSplit;
@@ -50,6 +43,18 @@ import org.apache.hudi.hadoop.RealtimeFileStatus;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.Job;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -86,7 +91,7 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
   }
 
   @Override
-  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, Option<HoodieVirtualKeyInfo> 
virtualKeyInfoOpt) {
+  protected FileStatus createFileStatusUnchecked(FileSlice fileSlice, 
HiveHoodieTableFileIndex fileIndex, HoodieTableMetaClient metaClient) {
     Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
     Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
     Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
@@ -96,9 +101,9 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
 
     // Check if we're reading a MOR table
     if (baseFileOpt.isPresent()) {
-      return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, 
tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
+      return createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles, 
tableBasePath, latestCompletedInstantOpt, getHoodieVirtualKeyInfo(metaClient));
     } else if (latestLogFileOpt.isPresent()) {
-      return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), 
logFiles, tableBasePath, latestCompletedInstantOpt, virtualKeyInfoOpt);
+      return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), 
logFiles, tableBasePath, latestCompletedInstantOpt, 
getHoodieVirtualKeyInfo(metaClient));
     } else {
       throw new IllegalStateException("Invalid state: either base-file or 
log-file has to be present");
     }
@@ -384,5 +389,24 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
       throw new HoodieIOException(String.format("Failed to init %s", 
RealtimeFileStatus.class.getSimpleName()), e);
     }
   }
-}
 
+  private static Option<HoodieVirtualKeyInfo> 
getHoodieVirtualKeyInfo(HoodieTableMetaClient metaClient) {
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    if (tableConfig.populateMetaFields()) {
+      return Option.empty();
+    }
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    try {
+      Schema schema = tableSchemaResolver.getTableAvroSchema();
+      boolean isNonPartitionedKeyGen = 
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
+      return Option.of(
+          new HoodieVirtualKeyInfo(
+              tableConfig.getRecordKeyFieldProp(),
+              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(tableConfig.getPartitionFieldProp()),
+              schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
+              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
+    } catch (Exception exception) {
+      throw new HoodieException("Fetching table schema failed with exception 
", exception);
+    }
+  }
+}

Reply via email to