This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb96b2cb38 [HUDI-5353] Close file readers (#7412)
eb96b2cb38 is described below

commit eb96b2cb3803436ff0f50bf9854359884c21e7be
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Dec 12 16:20:45 2022 +0530

    [HUDI-5353] Close file readers (#7412)
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  3 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  3 +-
 .../hudi/table/action/commit/BaseMergeHelper.java  | 27 ++--------------
 .../table/action/commit/HoodieMergeHelper.java     | 35 ++++++++++++++-------
 .../run/strategy/JavaExecutionStrategy.java        | 16 +++++++---
 .../MultipleSparkJobExecutionStrategy.java         | 36 +++++++++++-----------
 .../strategy/SingleSparkJobExecutionStrategy.java  |  2 --
 .../org/apache/hudi/hadoop/InputSplitUtils.java    | 28 -----------------
 .../utils/HoodieRealtimeRecordReaderUtils.java     | 27 +++-------------
 .../utilities/HoodieMetadataTableValidator.java    | 31 +++++++++----------
 10 files changed, 78 insertions(+), 130 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index d6872276ac..6bbea356e5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -150,11 +150,10 @@ public class HoodieIndexUtils {
                                                 Configuration configuration) 
throws HoodieIndexException {
     ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
     List<String> foundRecordKeys = new ArrayList<>();
-    try {
+    try (HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
       // Load all rowKeys from the file, to double-confirm
       if (!candidateRecordKeys.isEmpty()) {
         HoodieTimer timer = HoodieTimer.start();
-        HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath);
         Set<String> fileRowKeys = fileReader.filterRowKeys(new 
TreeSet<>(candidateRecordKeys));
         foundRecordKeys.addAll(fileRowKeys);
         LOG.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 18e7824c5b..97cade66e1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -452,8 +452,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     }
 
     long oldNumWrites = 0;
-    try {
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+    try (HoodieFileReader reader = 
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), 
oldFilePath)) {
       oldNumWrites = reader.getTotalRecords();
     } catch (IOException e) {
       throw new HoodieUpsertException("Failed to check for merge data 
validation", e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 8508e18ad6..f6572aae4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -18,19 +18,13 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.generic.GenericRecord;
+
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
@@ -45,23 +39,6 @@ public abstract class BaseMergeHelper {
    */
   public abstract void runMerge(HoodieTable<?, ?, ?, ?> table, 
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
 
-  /**
-   * Create Parquet record iterator that provides a stitched view of record 
read from skeleton and bootstrap file.
-   * Skeleton file is a representation of the bootstrap file inside the table, 
with just the bare bone fields needed
-   * for indexing, writing and other functionality.
-   *
-   */
-  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> 
table,
-                                                       HoodieMergeHandle<?, ?, 
?, ?> mergeHandle,
-                                                       Path bootstrapFilePath,
-                                                       Iterator<GenericRecord> 
recordIterator) throws IOException {
-    Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
-    HoodieFileReader<GenericRecord> bootstrapReader =
-        HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, 
bootstrapFilePath);
-    return new MergingIterator<>(recordIterator, 
bootstrapReader.getRecordIterator(),
-        (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
-  }
-
   /**
    * Consumer that dequeues records from queue and sends to Merge Handle.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 43f40a778a..0b50f2a302 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -18,11 +18,8 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -47,6 +44,12 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.QueueBasedExecutorFactory;
 
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -78,10 +81,11 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends BaseMergeH
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
     Configuration hadoopConf = new Configuration(table.getHadoopConf());
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
+    HoodieFileReader<GenericRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
+    HoodieFileReader<GenericRecord> bootstrapFileReader = null;
 
     Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    Schema readerSchema = reader.getSchema();
+    Schema readerSchema = baseFileReader.getSchema();
 
     // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
     // persisted records to adhere to an evolved schema
@@ -106,11 +110,17 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends BaseMergeH
 
       // In case writer's schema is simply a projection of the reader's one we 
can read
       // the records in the projected schema directly
-      ClosableIterator<GenericRecord> baseFileRecordIterator =
-          reader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
+      ClosableIterator<GenericRecord> baseFileRecordIterator = 
baseFileReader.getRecordIterator(isPureProjection ? writerSchema : 
readerSchema);
+
       if (baseFile.getBootstrapBaseFile().isPresent()) {
         Path bootstrapFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
-        recordIterator = getMergingIterator(table, mergeHandle, 
bootstrapFilePath, baseFileRecordIterator);
+        Configuration bootstrapFileConfig = new 
Configuration(table.getHadoopConf());
+        bootstrapFileReader = 
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+        recordIterator = new MergingIterator<>(
+            baseFileRecordIterator,
+            bootstrapFileReader.getRecordIterator(),
+            (inputRecordPair) -> 
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(), 
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
+
       } else if (schemaEvolutionTransformerOpt.isPresent()) {
         recordIterator = new MappingIterator<>(baseFileRecordIterator,
             schemaEvolutionTransformerOpt.get());
@@ -132,7 +142,10 @@ public class HoodieMergeHelper<T extends 
HoodieRecordPayload> extends BaseMergeH
     } finally {
       // HUDI-2875: mergeHandle is not thread safe, we should totally 
terminate record inputting
       // and executor firstly and then close mergeHandle.
-      reader.close();
+      baseFileReader.close();
+      if (bootstrapFileReader != null) {
+        bootstrapFileReader.close();
+      }
       if (null != wrapper) {
         wrapper.shutdownNow();
         wrapper.awaitTermination();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 456bb3cb47..c6f885fa91 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -177,9 +177,11 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
     clusteringOps.forEach(clusteringOp -> {
       long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new 
JavaTaskContextSupplier(), config);
       LOG.info("MaxMemoryPerCompaction run as part of clustering => " + 
maxMemoryPerCompaction);
+      Option<HoodieFileReader> baseFileReader = Option.empty();
+      HoodieMergedLogRecordScanner scanner = null;
       try {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
-        HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        scanner = HoodieMergedLogRecordScanner.newBuilder()
             .withFileSystem(table.getMetaClient().getFs())
             .withBasePath(table.getMetaClient().getBasePath())
             .withLogFilePaths(clusteringOp.getDeltaFilePaths())
@@ -195,7 +197,7 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
             
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
             .build();
 
-        Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+        baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
             ? Option.empty()
             : 
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new 
Path(clusteringOp.getDataFilePath())));
         HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
@@ -208,6 +210,13 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
       } catch (IOException e) {
         throw new HoodieClusteringException("Error reading input data for " + 
clusteringOp.getDataFilePath()
             + " and " + clusteringOp.getDeltaFilePaths(), e);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+        if (baseFileReader.isPresent()) {
+          baseFileReader.get().close();
+        }
       }
     });
     return records;
@@ -219,9 +228,8 @@ public abstract class JavaExecutionStrategy<T extends 
HoodieRecordPayload<T>>
   private List<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
     List<HoodieRecord<T>> records = new ArrayList<>();
     clusteringOps.forEach(clusteringOp -> {
-      try {
+      try (HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()))) {
         Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
-        HoodieFileReader<IndexedRecord> baseFileReader = 
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new 
Path(clusteringOp.getDataFilePath()));
         Iterator<IndexedRecord> recordIterator = 
baseFileReader.getRecordIterator(readerSchema);
         recordIterator.forEachRemaining(record -> 
records.add(transform(record)));
       } catch (IOException e) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 6095735d9f..074deaa621 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -104,20 +104,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
     boolean shouldPreserveMetadata = 
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
     // execute clustering for each group async and collect WriteStatus
     Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
-        clusteringPlan.getInputGroups().stream()
-            .map(inputGroup -> {
-              if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
-                return runClusteringForGroupAsyncAsRow(inputGroup,
-                    clusteringPlan.getStrategy().getStrategyParams(),
-                    shouldPreserveMetadata,
-                    instantTime);
-              }
-              return runClusteringForGroupAsync(inputGroup,
-                  clusteringPlan.getStrategy().getStrategyParams(),
-                  shouldPreserveMetadata,
-                  instantTime);
-            })
-            .collect(Collectors.toList()))
+            clusteringPlan.getInputGroups().stream()
+                .map(inputGroup -> {
+                  if 
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 false)) {
+                    return runClusteringForGroupAsyncAsRow(inputGroup,
+                        clusteringPlan.getStrategy().getStrategyParams(),
+                        shouldPreserveMetadata,
+                        instantTime);
+                  }
+                  return runClusteringForGroupAsync(inputGroup,
+                      clusteringPlan.getStrategy().getStrategyParams(),
+                      shouldPreserveMetadata,
+                      instantTime);
+                })
+                .collect(Collectors.toList()))
         .join()
         .stream();
     JavaRDD<WriteStatus>[] writeStatuses = 
convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
@@ -187,7 +187,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
     Option<String[]> orderByColumnsOpt =
         Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
             .map(listStr -> listStr.split(","));
-    
+
     return orderByColumnsOpt.map(orderByColumns -> {
       HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = 
getWriteConfig().getLayoutOptimizationStrategy();
       switch (layoutOptStrategy) {
@@ -267,8 +267,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * Read records from baseFiles, apply updates and convert to RDD.
    */
   private HoodieData<HoodieRecord<T>> 
readRecordsForGroupWithLogs(JavaSparkContext jsc,
-                                                               
List<ClusteringOperation> clusteringOps,
-                                                               String 
instantTime) {
+                                                                  
List<ClusteringOperation> clusteringOps,
+                                                                  String 
instantTime) {
     HoodieWriteConfig config = getWriteConfig();
     HoodieTable table = getHoodieTable();
     return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, 
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
@@ -318,7 +318,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
    * Read records from baseFiles and convert to RDD.
    */
   private HoodieData<HoodieRecord<T>> 
readRecordsForGroupBaseFiles(JavaSparkContext jsc,
-                                                                
List<ClusteringOperation> clusteringOps) {
+                                                                   
List<ClusteringOperation> clusteringOps) {
     SerializableConfiguration hadoopConf = new 
SerializableConfiguration(getHoodieTable().getHadoopConf());
     HoodieWriteConfig writeConfig = getWriteConfig();
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 46d2466c5c..8606c89c49 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -103,7 +103,6 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
     return writeMetadata;
   }
 
-
   /**
    * Submit job to execute clustering for the group.
    */
@@ -124,7 +123,6 @@ public abstract class SingleSparkJobExecutionStrategy<T 
extends HoodieRecordPayl
         .flatMap(Collection::stream);
   }
 
-
   /**
    * Execute clustering to write inputRecords into new files as defined by 
rules in strategy parameters.
    * The number of new file groups created is bounded by numOutputGroups.
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
index e485e72c25..5dcd66cd82 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
@@ -22,14 +22,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 
 public class InputSplitUtils {
 
@@ -52,24 +44,4 @@ public class InputSplitUtils {
   public static boolean readBoolean(DataInput in) throws IOException {
     return in.readBoolean();
   }
-
-  /**
-   * Return correct base-file schema based on split.
-   *
-   * @param split File Split
-   * @param conf Configuration
-   * @return
-   */
-  public static Schema getBaseFileSchema(FileSplit split, Configuration conf) {
-    try {
-      if (split instanceof BootstrapBaseFileSplit) {
-        HoodieFileReader storageReader = 
HoodieFileReaderFactory.getFileReader(conf,
-            
((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath());
-        return HoodieAvroUtils.addMetadataFields(storageReader.getSchema());
-      }
-      return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath());
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read footer for parquet " + 
split.getPath(), e);
-    }
-  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index b87758af90..da39249229 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,6 +18,10 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalTypes;
@@ -25,8 +29,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -42,20 +44,13 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.sql.Timestamp;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -70,18 +65,6 @@ import static 
org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
 public class HoodieRealtimeRecordReaderUtils {
   private static final Logger LOG = 
LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
 
-  /**
-   * Reads the schema from the base file.
-   */
-  public static Schema readSchema(Configuration conf, Path filePath) {
-    try {
-      HoodieFileReader storageReader = 
HoodieFileReaderFactory.getFileReader(conf, filePath);
-      return storageReader.getSchema();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read schema from " + filePath, e);
-    }
-  }
-
   /**
    * get the max compaction memory in bytes from JobConf.
    */
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 6566f0c029..347078ec71 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -152,7 +152,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   // Properties with source, hoodie client, key generator etc.
   private TypedProperties props;
 
-  private HoodieTableMetaClient metaClient;
+  private final HoodieTableMetaClient metaClient;
 
   protected transient Option<AsyncMetadataTableValidateService> 
asyncMetadataTableValidateService;
 
@@ -940,10 +940,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
    * verified in the {@link HoodieMetadataTableValidator}.
    */
   private static class HoodieMetadataValidationContext implements Serializable 
{
-    private HoodieTableMetaClient metaClient;
-    private HoodieTableFileSystemView fileSystemView;
-    private HoodieTableMetadata tableMetadata;
-    private boolean enableMetadataTable;
+    private final HoodieTableMetaClient metaClient;
+    private final HoodieTableFileSystemView fileSystemView;
+    private final HoodieTableMetadata tableMetadata;
+    private final boolean enableMetadataTable;
     private List<String> allColumnNameList;
 
     public HoodieMetadataValidationContext(
@@ -1038,30 +1038,29 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
       try {
         return schemaResolver.getTableAvroSchema().getFields().stream()
-            .map(entry -> entry.name()).collect(Collectors.toList());
+            .map(Schema.Field::name).collect(Collectors.toList());
       } catch (Exception e) {
         throw new HoodieException("Failed to get all column names for " + 
metaClient.getBasePath());
       }
     }
 
     private Option<BloomFilterData> readBloomFilterFromFile(String 
partitionPath, String filename) {
-      Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), 
partitionPath), filename);
-      HoodieFileReader<IndexedRecord> fileReader;
-      try {
-        fileReader = 
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path);
+      Path path = new 
Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), 
filename);
+      BloomFilter bloomFilter;
+      try (HoodieFileReader<IndexedRecord> fileReader = 
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path)) {
+        bloomFilter = fileReader.readBloomFilter();
+        if (bloomFilter == null) {
+          Log.error("Failed to read bloom filter for " + path);
+          return Option.empty();
+        }
       } catch (IOException e) {
         Log.error("Failed to get file reader for " + path + " " + 
e.getMessage());
         return Option.empty();
       }
-      final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
-      if (fileBloomFilter == null) {
-        Log.error("Failed to read bloom filter for " + path);
-        return Option.empty();
-      }
       return Option.of(BloomFilterData.builder()
           .setPartitionPath(partitionPath)
           .setFilename(filename)
-          
.setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()))
+          
.setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()))
           .build());
     }
   }

Reply via email to