This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb96b2cb38 [HUDI-5353] Close file readers (#7412)
eb96b2cb38 is described below
commit eb96b2cb3803436ff0f50bf9854359884c21e7be
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Dec 12 16:20:45 2022 +0530
[HUDI-5353] Close file readers (#7412)
---
.../org/apache/hudi/index/HoodieIndexUtils.java | 3 +-
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 3 +-
.../hudi/table/action/commit/BaseMergeHelper.java | 27 ++--------------
.../table/action/commit/HoodieMergeHelper.java | 35 ++++++++++++++-------
.../run/strategy/JavaExecutionStrategy.java | 16 +++++++---
.../MultipleSparkJobExecutionStrategy.java | 36 +++++++++++-----------
.../strategy/SingleSparkJobExecutionStrategy.java | 2 --
.../org/apache/hudi/hadoop/InputSplitUtils.java | 28 -----------------
.../utils/HoodieRealtimeRecordReaderUtils.java | 27 +++-------------
.../utilities/HoodieMetadataTableValidator.java | 31 +++++++++----------
10 files changed, 78 insertions(+), 130 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index d6872276ac..6bbea356e5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -150,11 +150,10 @@ public class HoodieIndexUtils {
Configuration configuration)
throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
List<String> foundRecordKeys = new ArrayList<>();
- try {
+ try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = HoodieTimer.start();
- HoodieFileReader fileReader =
HoodieFileReaderFactory.getFileReader(configuration, filePath);
Set<String> fileRowKeys = fileReader.filterRowKeys(new
TreeSet<>(candidateRecordKeys));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms.
#candidates (%d) #found (%d)", filePath,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 18e7824c5b..97cade66e1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -452,8 +452,7 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload, I, K, O> extends H
}
long oldNumWrites = 0;
- try {
- HoodieFileReader reader =
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
+ try (HoodieFileReader reader =
HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(),
oldFilePath)) {
oldNumWrites = reader.getTotalRecords();
} catch (IOException e) {
throw new HoodieUpsertException("Failed to check for merge data
validation", e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
index 8508e18ad6..f6572aae4a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java
@@ -18,19 +18,13 @@
package org.apache.hudi.table.action.commit;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.io.HoodieMergeHandle;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
+import org.apache.avro.generic.GenericRecord;
+
import java.io.IOException;
-import java.util.Iterator;
/**
* Helper to read records from previous version of base file and run Merge.
@@ -45,23 +39,6 @@ public abstract class BaseMergeHelper {
*/
public abstract void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
- /**
- * Create Parquet record iterator that provides a stitched view of record
read from skeleton and bootstrap file.
- * Skeleton file is a representation of the bootstrap file inside the table,
with just the bare bone fields needed
- * for indexing, writing and other functionality.
- *
- */
- protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?>
table,
- HoodieMergeHandle<?, ?,
?, ?> mergeHandle,
- Path bootstrapFilePath,
- Iterator<GenericRecord>
recordIterator) throws IOException {
- Configuration bootstrapFileConfig = new
Configuration(table.getHadoopConf());
- HoodieFileReader<GenericRecord> bootstrapReader =
- HoodieFileReaderFactory.getFileReader(bootstrapFileConfig,
bootstrapFilePath);
- return new MergingIterator<>(recordIterator,
bootstrapReader.getRecordIterator(),
- (inputRecordPair) ->
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(),
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
- }
-
/**
* Consumer that dequeues records from queue and sends to Merge Handle.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 43f40a778a..0b50f2a302 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -18,11 +18,8 @@
package org.apache.hudi.table.action.commit;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaCompatibility;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.utils.MergingIterator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -47,6 +44,12 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.QueueBasedExecutorFactory;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -78,10 +81,11 @@ public class HoodieMergeHelper<T extends
HoodieRecordPayload> extends BaseMergeH
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
Configuration hadoopConf = new Configuration(table.getHadoopConf());
- HoodieFileReader<GenericRecord> reader =
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
+ HoodieFileReader<GenericRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
+ HoodieFileReader<GenericRecord> bootstrapFileReader = null;
Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
- Schema readerSchema = reader.getSchema();
+ Schema readerSchema = baseFileReader.getSchema();
// In case Advanced Schema Evolution is enabled we might need to rewrite
currently
// persisted records to adhere to an evolved schema
@@ -106,11 +110,17 @@ public class HoodieMergeHelper<T extends
HoodieRecordPayload> extends BaseMergeH
// In case writer's schema is simply a projection of the reader's one we
can read
// the records in the projected schema directly
- ClosableIterator<GenericRecord> baseFileRecordIterator =
- reader.getRecordIterator(isPureProjection ? writerSchema :
readerSchema);
+ ClosableIterator<GenericRecord> baseFileRecordIterator =
baseFileReader.getRecordIterator(isPureProjection ? writerSchema :
readerSchema);
+
if (baseFile.getBootstrapBaseFile().isPresent()) {
Path bootstrapFilePath = new
Path(baseFile.getBootstrapBaseFile().get().getPath());
- recordIterator = getMergingIterator(table, mergeHandle,
bootstrapFilePath, baseFileRecordIterator);
+ Configuration bootstrapFileConfig = new
Configuration(table.getHadoopConf());
+ bootstrapFileReader =
HoodieFileReaderFactory.getFileReader(bootstrapFileConfig, bootstrapFilePath);
+ recordIterator = new MergingIterator<>(
+ baseFileRecordIterator,
+ bootstrapFileReader.getRecordIterator(),
+ (inputRecordPair) ->
HoodieAvroUtils.stitchRecords(inputRecordPair.getLeft(),
inputRecordPair.getRight(), mergeHandle.getWriterSchemaWithMetaFields()));
+
} else if (schemaEvolutionTransformerOpt.isPresent()) {
recordIterator = new MappingIterator<>(baseFileRecordIterator,
schemaEvolutionTransformerOpt.get());
@@ -132,7 +142,10 @@ public class HoodieMergeHelper<T extends
HoodieRecordPayload> extends BaseMergeH
} finally {
// HUDI-2875: mergeHandle is not thread safe, we should totally
terminate record inputting
// and executor firstly and then close mergeHandle.
- reader.close();
+ baseFileReader.close();
+ if (bootstrapFileReader != null) {
+ bootstrapFileReader.close();
+ }
if (null != wrapper) {
wrapper.shutdownNow();
wrapper.awaitTermination();
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index 456bb3cb47..c6f885fa91 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -177,9 +177,11 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
clusteringOps.forEach(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new
JavaTaskContextSupplier(), config);
LOG.info("MaxMemoryPerCompaction run as part of clustering => " +
maxMemoryPerCompaction);
+ Option<HoodieFileReader> baseFileReader = Option.empty();
+ HoodieMergedLogRecordScanner scanner = null;
try {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(table.getMetaClient().getFs())
.withBasePath(table.getMetaClient().getBasePath())
.withLogFilePaths(clusteringOp.getDeltaFilePaths())
@@ -195,7 +197,7 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
- Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
+ baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
? Option.empty()
:
Option.of(HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new
Path(clusteringOp.getDataFilePath())));
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
@@ -208,6 +210,13 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
} catch (IOException e) {
throw new HoodieClusteringException("Error reading input data for " +
clusteringOp.getDataFilePath()
+ " and " + clusteringOp.getDeltaFilePaths(), e);
+ } finally {
+ if (scanner != null) {
+ scanner.close();
+ }
+ if (baseFileReader.isPresent()) {
+ baseFileReader.get().close();
+ }
}
});
return records;
@@ -219,9 +228,8 @@ public abstract class JavaExecutionStrategy<T extends
HoodieRecordPayload<T>>
private List<HoodieRecord<T>>
readRecordsForGroupBaseFiles(List<ClusteringOperation> clusteringOps) {
List<HoodieRecord<T>> records = new ArrayList<>();
clusteringOps.forEach(clusteringOp -> {
- try {
+ try (HoodieFileReader<IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new
Path(clusteringOp.getDataFilePath()))) {
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
- HoodieFileReader<IndexedRecord> baseFileReader =
HoodieFileReaderFactory.getFileReader(getHoodieTable().getHadoopConf(), new
Path(clusteringOp.getDataFilePath()));
Iterator<IndexedRecord> recordIterator =
baseFileReader.getRecordIterator(readerSchema);
recordIterator.forEachRemaining(record ->
records.add(transform(record)));
} catch (IOException e) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 6095735d9f..074deaa621 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -104,20 +104,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
boolean shouldPreserveMetadata =
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false);
// execute clustering for each group async and collect WriteStatus
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
- clusteringPlan.getInputGroups().stream()
- .map(inputGroup -> {
- if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
false)) {
- return runClusteringForGroupAsyncAsRow(inputGroup,
- clusteringPlan.getStrategy().getStrategyParams(),
- shouldPreserveMetadata,
- instantTime);
- }
- return runClusteringForGroupAsync(inputGroup,
- clusteringPlan.getStrategy().getStrategyParams(),
- shouldPreserveMetadata,
- instantTime);
- })
- .collect(Collectors.toList()))
+ clusteringPlan.getInputGroups().stream()
+ .map(inputGroup -> {
+ if
(getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
false)) {
+ return runClusteringForGroupAsyncAsRow(inputGroup,
+ clusteringPlan.getStrategy().getStrategyParams(),
+ shouldPreserveMetadata,
+ instantTime);
+ }
+ return runClusteringForGroupAsync(inputGroup,
+ clusteringPlan.getStrategy().getStrategyParams(),
+ shouldPreserveMetadata,
+ instantTime);
+ })
+ .collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses =
convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
@@ -187,7 +187,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
-
+
return orderByColumnsOpt.map(orderByColumns -> {
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy =
getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
@@ -267,8 +267,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
* Read records from baseFiles, apply updates and convert to RDD.
*/
private HoodieData<HoodieRecord<T>>
readRecordsForGroupWithLogs(JavaSparkContext jsc,
-
List<ClusteringOperation> clusteringOps,
- String
instantTime) {
+
List<ClusteringOperation> clusteringOps,
+ String
instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
return HoodieJavaRDD.of(jsc.parallelize(clusteringOps,
clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
@@ -318,7 +318,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T
extends HoodieRecordPa
* Read records from baseFiles and convert to RDD.
*/
private HoodieData<HoodieRecord<T>>
readRecordsForGroupBaseFiles(JavaSparkContext jsc,
-
List<ClusteringOperation> clusteringOps) {
+
List<ClusteringOperation> clusteringOps) {
SerializableConfiguration hadoopConf = new
SerializableConfiguration(getHoodieTable().getHadoopConf());
HoodieWriteConfig writeConfig = getWriteConfig();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
index 46d2466c5c..8606c89c49 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java
@@ -103,7 +103,6 @@ public abstract class SingleSparkJobExecutionStrategy<T
extends HoodieRecordPayl
return writeMetadata;
}
-
/**
* Submit job to execute clustering for the group.
*/
@@ -124,7 +123,6 @@ public abstract class SingleSparkJobExecutionStrategy<T
extends HoodieRecordPayl
.flatMap(Collection::stream);
}
-
/**
* Execute clustering to write inputRecords into new files as defined by
rules in strategy parameters.
* The number of new file groups created is bounded by numOutputGroups.
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
index e485e72c25..5dcd66cd82 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputSplitUtils.java
@@ -22,14 +22,6 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
public class InputSplitUtils {
@@ -52,24 +44,4 @@ public class InputSplitUtils {
public static boolean readBoolean(DataInput in) throws IOException {
return in.readBoolean();
}
-
- /**
- * Return correct base-file schema based on split.
- *
- * @param split File Split
- * @param conf Configuration
- * @return
- */
- public static Schema getBaseFileSchema(FileSplit split, Configuration conf) {
- try {
- if (split instanceof BootstrapBaseFileSplit) {
- HoodieFileReader storageReader =
HoodieFileReaderFactory.getFileReader(conf,
-
((BootstrapBaseFileSplit)(split)).getBootstrapFileSplit().getPath());
- return HoodieAvroUtils.addMetadataFields(storageReader.getSchema());
- }
- return HoodieRealtimeRecordReaderUtils.readSchema(conf, split.getPath());
- } catch (IOException e) {
- throw new HoodieIOException("Failed to read footer for parquet " +
split.getPath(), e);
- }
- }
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index b87758af90..da39249229 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -18,6 +18,10 @@
package org.apache.hudi.hadoop.utils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalTypes;
@@ -25,8 +29,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
@@ -42,20 +44,13 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import java.io.IOException;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
-import java.sql.Timestamp;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -70,18 +65,6 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
public class HoodieRealtimeRecordReaderUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeRecordReaderUtils.class);
- /**
- * Reads the schema from the base file.
- */
- public static Schema readSchema(Configuration conf, Path filePath) {
- try {
- HoodieFileReader storageReader =
HoodieFileReaderFactory.getFileReader(conf, filePath);
- return storageReader.getSchema();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to read schema from " + filePath, e);
- }
- }
-
/**
* get the max compaction memory in bytes from JobConf.
*/
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 6566f0c029..347078ec71 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -152,7 +152,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
// Properties with source, hoodie client, key generator etc.
private TypedProperties props;
- private HoodieTableMetaClient metaClient;
+ private final HoodieTableMetaClient metaClient;
protected transient Option<AsyncMetadataTableValidateService>
asyncMetadataTableValidateService;
@@ -940,10 +940,10 @@ public class HoodieMetadataTableValidator implements
Serializable {
* verified in the {@link HoodieMetadataTableValidator}.
*/
private static class HoodieMetadataValidationContext implements Serializable
{
- private HoodieTableMetaClient metaClient;
- private HoodieTableFileSystemView fileSystemView;
- private HoodieTableMetadata tableMetadata;
- private boolean enableMetadataTable;
+ private final HoodieTableMetaClient metaClient;
+ private final HoodieTableFileSystemView fileSystemView;
+ private final HoodieTableMetadata tableMetadata;
+ private final boolean enableMetadataTable;
private List<String> allColumnNameList;
public HoodieMetadataValidationContext(
@@ -1038,30 +1038,29 @@ public class HoodieMetadataTableValidator implements
Serializable {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
try {
return schemaResolver.getTableAvroSchema().getFields().stream()
- .map(entry -> entry.name()).collect(Collectors.toList());
+ .map(Schema.Field::name).collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieException("Failed to get all column names for " +
metaClient.getBasePath());
}
}
private Option<BloomFilterData> readBloomFilterFromFile(String
partitionPath, String filename) {
- Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePath(),
partitionPath), filename);
- HoodieFileReader<IndexedRecord> fileReader;
- try {
- fileReader =
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path);
+ Path path = new
Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath),
filename);
+ BloomFilter bloomFilter;
+ try (HoodieFileReader<IndexedRecord> fileReader =
HoodieFileReaderFactory.getFileReader(metaClient.getHadoopConf(), path)) {
+ bloomFilter = fileReader.readBloomFilter();
+ if (bloomFilter == null) {
+ Log.error("Failed to read bloom filter for " + path);
+ return Option.empty();
+ }
} catch (IOException e) {
Log.error("Failed to get file reader for " + path + " " +
e.getMessage());
return Option.empty();
}
- final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
- if (fileBloomFilter == null) {
- Log.error("Failed to read bloom filter for " + path);
- return Option.empty();
- }
return Option.of(BloomFilterData.builder()
.setPartitionPath(partitionPath)
.setFilename(filename)
-
.setBloomFilter(ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes()))
+
.setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()))
.build());
}
}