This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c357b4d18ca87ba4d7f8d533cc4d5565ef2b7620 Author: Vova Kolmakov <[email protected]> AuthorDate: Mon Apr 15 11:31:11 2024 +0700 [HUDI-7584] Always read log block lazily and remove readBlockLazily argument (#11015) --- .../hudi/cli/commands/HoodieLogFileCommand.java | 3 - .../cli/commands/TestHoodieLogFileCommand.java | 3 - .../org/apache/hudi/io/HoodieMergedReadHandle.java | 1 - .../hudi/table/action/compact/HoodieCompactor.java | 1 - .../run/strategy/JavaExecutionStrategy.java | 1 - .../MultipleSparkJobExecutionStrategy.java | 1 - .../hudi/common/table/TableSchemaResolver.java | 21 ++-- .../table/log/AbstractHoodieLogRecordReader.java | 65 +++++------ .../table/log/HoodieCDCLogRecordIterator.java | 3 +- .../hudi/common/table/log/HoodieLogFileReader.java | 69 +++++------ .../hudi/common/table/log/HoodieLogFormat.java | 13 +-- .../common/table/log/HoodieLogFormatReader.java | 14 +-- .../table/log/HoodieMergedLogRecordScanner.java | 27 ++--- .../table/log/HoodieUnMergedLogRecordScanner.java | 12 +- .../hudi/common/table/log/LogReaderUtils.java | 2 +- .../metadata/HoodieMetadataLogRecordReader.java | 1 - .../hudi/metadata/HoodieTableMetadataUtil.java | 1 - .../common/functional/TestHoodieLogFormat.java | 128 +++++++-------------- .../examples/quickstart/TestQuickstartData.java | 1 - .../hudi/sink/clustering/ClusteringOperator.java | 1 - .../org/apache/hudi/table/format/FormatUtils.java | 6 - .../test/java/org/apache/hudi/utils/TestData.java | 1 - .../realtime/HoodieMergeOnReadSnapshotReader.java | 3 - .../realtime/RealtimeCompactedRecordReader.java | 1 - .../realtime/RealtimeUnmergedRecordReader.java | 1 - .../reader/DFSHoodieDatasetInputReader.java | 1 - .../src/main/scala/org/apache/hudi/Iterators.scala | 4 - .../ShowHoodieLogFileRecordsProcedure.scala | 1 - .../utilities/HoodieMetadataTableValidator.java | 126 +++++++++----------- 29 files changed, 188 insertions(+), 324 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 46a9e787ea6..77d9392fcd0 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -238,9 +238,6 @@ public class HoodieLogFileCommand { .withLatestInstantTime( client.getActiveTimeline() .getCommitTimeline().lastInstant().get().getTimestamp()) - .withReadBlocksLazily( - Boolean.parseBoolean( - HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue())) .withReverseReader( Boolean.parseBoolean( HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue())) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index 6f75074ff29..dc9cdd1aaf1 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -241,9 +241,6 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { .withLatestInstantTime(INSTANT_TIME) .withMaxMemorySizeInBytes( HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) - .withReadBlocksLazily( - Boolean.parseBoolean( - HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue())) .withReverseReader( Boolean.parseBoolean( HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue())) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java index e74ab37f4b6..280e24e46b9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java @@ -126,7 +126,6 @@ public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K .withReaderSchema(readerSchema) .withLatestInstantTime(instantTime) .withMaxMemorySizeInBytes(IOUtils.getMaxMemoryPerCompaction(hoodieTable.getTaskContextSupplier(), config)) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index 940ab9886c3..461794a8f75 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -197,7 +197,6 @@ public abstract class HoodieCompactor<T, I, K, O> implements Serializable { .withInstantRange(instantRange) .withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())) .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index f73238d0210..70e8de465df 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -181,7 +181,6 @@ public abstract class JavaExecutionStrategy<T> .withReaderSchema(readerSchema) .withLatestInstantTime(instantTime) .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index b1fd74a6169..62a510a0b3c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -309,7 +309,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T> .withReaderSchema(readerSchema) .withLatestInstantTime(instantTime) .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 0344331ab75..c5d55cdd2c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -74,7 +75,6 @@ import java.util.function.Supplier; import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema; import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema; import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema; -import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; /** * Helper class to read schema from data files and log files and to convert it between different formats. @@ -284,13 +284,12 @@ public class TableSchemaResolver { Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator(); return Option.of(fetchSchemaFromFiles(filePaths)); } else { - LOG.warn("Could not find any data file written for commit, " - + "so could not get schema for table " + metaClient.getBasePath()); + LOG.warn("Could not find any data file written for commit, so could not get schema for table {}", metaClient.getBasePathV2()); return Option.empty(); } default: - LOG.error("Unknown table type " + metaClient.getTableType()); - throw new InvalidTableException(metaClient.getBasePath()); + LOG.error("Unknown table type {}", metaClient.getTableType()); + throw new InvalidTableException(metaClient.getBasePathV2().toString()); } } catch (IOException e) { throw new HoodieException("Failed to read data schema", e); @@ -328,7 +327,7 @@ public class TableSchemaResolver { } private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { - LOG.info("Reading schema from " + parquetFilePath); + LOG.info("Reading schema from {}", parquetFilePath); FileSystem fs = metaClient.getRawFs(); ParquetMetadata fileFooter = @@ -337,18 +336,18 @@ public class TableSchemaResolver { } private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { - LOG.info("Reading schema from " + hFilePath); + LOG.info("Reading schema from {}", hFilePath); FileSystem fs = metaClient.getRawFs(); try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, fs.getConf(), hFilePath)) { + .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, fs.getConf(), hFilePath)) { return convertAvroSchemaToParquet(fileReader.getSchema()); } } private MessageType readSchemaFromORCBaseFile(Path orcFilePath) throws IOException { - LOG.info("Reading schema from " + orcFilePath); + LOG.info("Reading schema from {}", orcFilePath); FileSystem fs = metaClient.getRawFs(); HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(fs.getConf(), orcFilePath); @@ -388,7 +387,7 @@ public class TableSchemaResolver { // We only need to read the schema from the log block header, // so we read the block lazily to avoid reading block content // containing the records - try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, false)) { + try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, false)) { HoodieDataBlock lastBlock = null; while (reader.hasNext()) { HoodieLogBlock block = reader.next(); @@ -473,7 +472,7 @@ public class TableSchemaResolver { Schema tableAvroSchema = getTableAvroSchemaFromDataFile(); return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null; } catch (Exception e) { - LOG.info(String.format("Failed to read operation field from avro schema (%s)", e.getMessage())); + LOG.info("Failed to read operation field from avro schema ({})", e.getMessage()); return false; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 6ce80da6d4a..affde833721 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -66,7 +66,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.log.block.HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BLOCK_IDENTIFIER; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES; import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; @@ -150,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader { private final boolean enableOptimizedLogBlocksScan; protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, - Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, + Schema readerSchema, String latestInstantTime, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan, Option<String> partitionNameOverride, @@ -243,12 +242,12 @@ public abstract class AbstractHoodieLogRecordReader { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); - LOG.info("Scanning log file " + logFile); + LOG.info("Scanning log file {}", logFile); scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file @@ -284,14 +283,14 @@ public abstract class AbstractHoodieLogRecordReader { case HFILE_DATA_BLOCK: case AVRO_DATA_BLOCK: case PARQUET_DATA_BLOCK: - LOG.info("Reading a data block from file " + logFile.getPath() + " at instant " + instantTime); + LOG.info("Reading a data block from file {} at instant {}", logFile.getPath(), instantTime); // store the current block currentInstantLogBlocks.push(logBlock); validLogBlockInstants.add(logBlock); updateBlockSequenceTracker(logBlock, instantTime, blockSeqNumber, attemptNumber, blockSequenceMapPerCommit, blockIdentifiersPresent); break; case DELETE_BLOCK: - LOG.info("Reading a delete block from file " + logFile.getPath()); + LOG.info("Reading a delete block from file {}", logFile.getPath()); // store deletes so can be rolled back currentInstantLogBlocks.push(logBlock); validLogBlockInstants.add(logBlock); @@ -314,8 +313,7 @@ public abstract class AbstractHoodieLogRecordReader { HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; String targetInstantForCommandBlock = logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME); - LOG.info(String.format("Reading a command block %s with targetInstantTime %s from file %s", commandBlock.getType(), targetInstantForCommandBlock, - logFile.getPath())); + LOG.info("Reading a command block {} with targetInstantTime {} from file {}", commandBlock.getType(), targetInstantForCommandBlock, logFile.getPath()); switch (commandBlock.getType()) { // there can be different types of command blocks case ROLLBACK_BLOCK: // Rollback older read log block(s) @@ -328,13 +326,12 @@ public abstract class AbstractHoodieLogRecordReader { currentInstantLogBlocks.removeIf(block -> { // handle corrupt blocks separately since they may not have metadata if (block.getBlockType() == CORRUPT_BLOCK) { - LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath()); return true; } if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) { // rollback older data block or delete block - LOG.info(String.format("Rolling back an older log block read from %s with instantTime %s", - logFile.getPath(), targetInstantForCommandBlock)); + LOG.info("Rolling back an older log block read from {} with instantTime {}", logFile.getPath(), targetInstantForCommandBlock); return true; } return false; @@ -347,13 +344,12 @@ public abstract class AbstractHoodieLogRecordReader { validLogBlockInstants = validLogBlockInstants.stream().filter(block -> { // handle corrupt blocks separately since they may not have metadata if (block.getBlockType() == CORRUPT_BLOCK) { - LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath()); + LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath()); return true; } if (targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME))) { // rollback older data block or delete block - LOG.info(String.format("Rolling back an older log block read from %s with instantTime %s", - logFile.getPath(), targetInstantForCommandBlock)); + LOG.info("Rolling back an older log block read from {} with instantTime {}", logFile.getPath(), targetInstantForCommandBlock); return false; } return true; @@ -361,10 +357,9 @@ public abstract class AbstractHoodieLogRecordReader { final int numBlocksRolledBack = instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size(); totalRollbacks.addAndGet(numBlocksRolledBack); - LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); + LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack); if (numBlocksRolledBack == 0) { - LOG.warn(String.format("TargetInstantTime %s invalid or extra rollback command block in %s", - targetInstantForCommandBlock, logFile.getPath())); + LOG.warn("TargetInstantTime {} invalid or extra rollback command block in {}", targetInstantForCommandBlock, logFile.getPath()); } break; default: @@ -372,7 +367,7 @@ public abstract class AbstractHoodieLogRecordReader { } break; case CORRUPT_BLOCK: - LOG.info("Found a corrupt block in " + logFile.getPath()); + LOG.info("Found a corrupt block in {}", logFile.getPath()); totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(logBlock); @@ -460,10 +455,8 @@ public abstract class AbstractHoodieLogRecordReader { for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> perAttemptEntries : perCommitBlockSequences.entrySet()) { Long attemptNo = perAttemptEntries.getKey(); if (maxAttemptNo != attemptNo) { - List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(pair -> pair.getValue()).collect(Collectors.toList()); - logBlocksToRemove.forEach(logBlockToRemove -> { - allValidLogBlocks.remove(logBlockToRemove); - }); + List<HoodieLogBlock> logBlocksToRemove = perCommitBlockSequences.get(attemptNo).stream().map(Pair::getValue).collect(Collectors.toList()); + logBlocksToRemove.forEach(logBlockToRemove -> allValidLogBlocks.remove(logBlockToRemove)); } } } @@ -478,12 +471,12 @@ public abstract class AbstractHoodieLogRecordReader { LOG.warn("Duplicate log blocks found "); for (Map.Entry<String, Map<Long, List<Pair<Integer, HoodieLogBlock>>>> entry : blockSequenceMapPerCommit.entrySet()) { if (entry.getValue().size() > 1) { - LOG.warn("\tCommit time " + entry.getKey()); + LOG.warn("\tCommit time {}", entry.getKey()); Map<Long, List<Pair<Integer, HoodieLogBlock>>> value = entry.getValue(); for (Map.Entry<Long, List<Pair<Integer, HoodieLogBlock>>> attemptsSeq : value.entrySet()) { - LOG.warn("\t\tAttempt number " + attemptsSeq.getKey()); - attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog block sequence no : " + entryValue.getKey() + ", log file " - + entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString())); + LOG.warn("\t\tAttempt number {}", attemptsSeq.getKey()); + attemptsSeq.getValue().forEach(entryValue -> LOG.warn("\t\t\tLog block sequence no : {}, log file {}", + entryValue.getKey(), entryValue.getValue().getBlockContentLocation().get().getLogFile().getPath().toString())); } } } @@ -556,7 +549,7 @@ public abstract class AbstractHoodieLogRecordReader { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), - readerSchema, true, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); + readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); /** * Scanning log blocks and placing the compacted blocks at the right place require two traversals. @@ -603,7 +596,7 @@ public abstract class AbstractHoodieLogRecordReader { */ while (logFormatReaderWrapper.hasNext()) { HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); - LOG.info("Scanning log file " + logFile); + LOG.info("Scanning log file {}", logFile); scannedLogFiles.add(logFile); totalLogFiles.set(scannedLogFiles.size()); // Use the HoodieLogFileReader to iterate through the blocks in the log file @@ -612,7 +605,7 @@ public abstract class AbstractHoodieLogRecordReader { totalLogBlocks.incrementAndGet(); // Ignore the corrupt blocks. No further handling is required for them. if (logBlock.getBlockType().equals(CORRUPT_BLOCK)) { - LOG.info("Found a corrupt block in " + logFile.getPath()); + LOG.info("Found a corrupt block in {}", logFile.getPath()); totalCorruptBlocks.incrementAndGet(); continue; } @@ -647,12 +640,12 @@ public abstract class AbstractHoodieLogRecordReader { instantToBlocksMap.put(instantTime, logBlocksList); break; case COMMAND_BLOCK: - LOG.info("Reading a command block from file " + logFile.getPath()); + LOG.info("Reading a command block from file {}", logFile.getPath()); // This is a command block - take appropriate action based on the command HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock; // Rollback blocks contain information of instants that are failed, collect them in a set.. - if (commandBlock.getType().equals(ROLLBACK_BLOCK)) { + if (commandBlock.getType().equals(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK)) { totalRollbacks.incrementAndGet(); String targetInstantForCommandBlock = logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME); @@ -669,7 +662,7 @@ public abstract class AbstractHoodieLogRecordReader { } if (LOG.isDebugEnabled()) { - LOG.debug("Ordered instant times seen " + orderedInstantsList); + LOG.debug("Ordered instant times seen {}", orderedInstantsList); } int numBlocksRolledBack = 0; @@ -725,10 +718,10 @@ public abstract class AbstractHoodieLogRecordReader { validBlockInstants.add(compactedFinalInstantTime); } } - LOG.info("Number of applied rollback blocks " + numBlocksRolledBack); + LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack); if (LOG.isDebugEnabled()) { - LOG.info("Final view of the Block time to compactionBlockMap " + blockTimeToCompactionBlockTimeMap); + LOG.info("Final view of the Block time to compactionBlockMap {}", blockTimeToCompactionBlockTimeMap); } // merge the last read block when all the blocks are done reading @@ -816,7 +809,7 @@ public abstract class AbstractHoodieLogRecordReader { private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen, Option<KeySpec> keySpecOpt) throws Exception { while (!logBlocks.isEmpty()) { - LOG.info("Number of remaining logblocks to merge " + logBlocks.size()); + LOG.info("Number of remaining logblocks to merge {}", logBlocks.size()); // poll the element at the bottom of the stack since that's the order it was inserted HoodieLogBlock lastBlock = logBlocks.pollLast(); switch (lastBlock.getBlockType()) { @@ -1022,8 +1015,6 @@ public abstract class AbstractHoodieLogRecordReader { public abstract Builder withLatestInstantTime(String latestInstantTime); - public abstract Builder withReadBlocksLazily(boolean readBlocksLazily); - public abstract Builder withReverseReader(boolean reverseReader); public abstract Builder withBufferSize(int bufferSize); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java index b2464345a1d..e5938bdefb0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java @@ -82,8 +82,7 @@ public class HoodieCDCLogRecordIterator implements ClosableIterator<IndexedRecor try { closeReader(); if (cdcLogFileIter.hasNext()) { - reader = new HoodieLogFileReader(fs, cdcLogFileIter.next(), cdcSchema, - HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + reader = new HoodieLogFileReader(fs, cdcLogFileIter.next(), cdcSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE); return reader.hasNext(); } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index c7289106f48..c1daf5e32d1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; @@ -64,7 +65,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import static org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; import static org.apache.hudi.common.util.ValidationUtils.checkState; @@ -77,6 +77,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 1024 * 1024; // 1 MB private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFileReader.class); + private static final String REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED = "Reverse log reader has not been enabled"; private final FileSystem fs; private final Configuration hadoopConf; @@ -86,7 +87,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private final Schema readerSchema; private final InternalSchema internalSchema; private final String keyField; - private final boolean readBlockLazily; private long reverseLogFilePosition; private long lastReverseLogFilePosition; private final boolean reverseReader; @@ -94,26 +94,22 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { private boolean closed = false; private SeekableDataInputStream inputStream; - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false); + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize) throws IOException { + this(fs, logFile, readerSchema, bufferSize, false); } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, - HoodieRecord.RECORD_KEY_METADATA_FIELD); + boolean reverseReader) throws IOException { + this(fs, logFile, readerSchema, bufferSize, reverseReader, false, HoodieRecord.RECORD_KEY_METADATA_FIELD); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField) throws IOException { - this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema()); + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, + boolean enableRecordLookups, String keyField) throws IOException { + this(fs, logFile, readerSchema, bufferSize, reverseReader, enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema()); } - public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, - String keyField, InternalSchema internalSchema) throws IOException { + public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean reverseReader, + boolean enableRecordLookups, String keyField, InternalSchema internalSchema) throws IOException { this.fs = fs; this.hadoopConf = fs.getConf(); // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path @@ -124,7 +120,6 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { this.bufferSize = bufferSize; this.inputStream = getDataInputStream(fs, this.logFile, bufferSize); this.readerSchema = readerSchema; - this.readBlockLazily = readBlockLazily; this.reverseReader = reverseReader; this.enableRecordLookups = enableRecordLookups; this.keyField = keyField; @@ -180,7 +175,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { // 6. Read the content or skip content based on IO vs Memory trade-off by client long contentPosition = inputStream.getPos(); - boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION; + boolean shouldReadLazily = nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION; Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily); // 7. Read footer if any @@ -204,7 +199,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, internalSchema); } else { - return new HoodieAvroDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + return new HoodieAvroDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc, getTargetReaderSchemaForBlock(), header, footer, keyField); } @@ -212,25 +207,25 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); return new HoodieHFileDataBlock( - () -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + () -> getDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc, Option.ofNullable(readerSchema), header, footer, enableRecordLookups, logFile.getPath(), - ConfigUtils.getBooleanWithAltKeys(fs.getConf(), USE_NATIVE_HFILE_READER)); + ConfigUtils.getBooleanWithAltKeys(fs.getConf(), HoodieReaderConfig.USE_NATIVE_HFILE_READER)); case PARQUET_DATA_BLOCK: checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); - return new HoodieParquetDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, + return new HoodieParquetDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc, getTargetReaderSchemaForBlock(), header, footer, keyField); case DELETE_BLOCK: - return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieDeleteBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), true, Option.of(logBlockContentLoc), header, footer); case COMMAND_BLOCK: - return new HoodieCommandBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), header, footer); + return new HoodieCommandBlock(content, () -> getDataInputStream(fs, this.logFile, bufferSize), true, Option.of(logBlockContentLoc), header, footer); case CDC_DATA_BLOCK: - return new HoodieCDCDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc, readerSchema, header, keyField); + return new HoodieCDCDataBlock(() -> getDataInputStream(fs, this.logFile, bufferSize), content, true, logBlockContentLoc, readerSchema, header, keyField); default: throw new HoodieNotSupportedException("Unsupported Block " + blockType); @@ -261,18 +256,18 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { } private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException { - LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos); + LOG.info("Log {} has a corrupted block at {}", logFile, blockStartPos); inputStream.seek(blockStartPos); long nextBlockOffset = scanForNextAvailableBlockOffset(); // Rewind to the initial start and read corrupted bytes till the nextBlockOffset inputStream.seek(blockStartPos); - LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset); + LOG.info("Next available block in {} starts at {}", logFile, nextBlockOffset); int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos); long contentPosition = inputStream.getPos(); - Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily); + Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, true); HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile, contentPosition, corruptedBlockSize, nextBlockOffset); - return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); + return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs, this.logFile, bufferSize), true, Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>()); } private boolean isBlockCorrupted(int blocksize) throws IOException { @@ -293,7 +288,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { // So we have to shorten the footer block size by the size of magic hash blockSizeFromFooter = inputStream.readLong() - magicBuffer.length; } catch (EOFException e) { - LOG.info("Found corrupted block in file " + logFile + " with block size(" + blocksize + ") running past EOF"); + LOG.info("Found corrupted block in file {} with block size({}) running past EOF", logFile, blocksize); // this is corrupt // This seek is required because contract of seek() is different for naked DFSInputStream vs BufferedFSInputStream // release-3.1.0-RC1/DFSInputStream.java#L1455 @@ -303,8 +298,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { } if (blocksize != blockSizeFromFooter) { - LOG.info("Found corrupted block in file " + logFile + ". Header block size(" + blocksize - + ") did not match the footer block size(" + blockSizeFromFooter + ")"); + LOG.info("Found corrupted block in file {}. Header block size({}) did not match the footer block size({})", logFile, blocksize, blockSizeFromFooter); inputStream.seek(currentPos); return true; } @@ -315,7 +309,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { return false; } catch (CorruptedLogFileException e) { // This is a corrupted block - LOG.info("Found corrupted block in file " + logFile + ". No magic hash found right after footer block size entry"); + LOG.info("Found corrupted block in file {}. No magic hash found right after footer block size entry", logFile); return true; } finally { inputStream.seek(currentPos); @@ -348,7 +342,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { @Override public void close() throws IOException { if (!closed) { - LOG.info("Closing Log file reader " + logFile.getFileName()); + LOG.info("Closing Log file reader {}", logFile.getFileName()); if (null != this.inputStream) { this.inputStream.close(); } @@ -411,7 +405,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public boolean hasPrev() { try { if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); + throw new HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED); } reverseLogFilePosition = lastReverseLogFilePosition; reverseLogFilePosition -= Long.BYTES; @@ -433,7 +427,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public HoodieLogBlock prev() throws IOException { if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); + throw new HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED); } long blockSize = inputStream.readLong(); long blockEndPos = inputStream.getPos(); @@ -443,8 +437,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { } catch (Exception e) { // this could be a corrupt block inputStream.seek(blockEndPos); - throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, " - + "fallback to forward reading of logfile"); + throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, fallback to forward reading of logfile"); } boolean hasNext = hasNext(); reverseLogFilePosition -= blockSize; @@ -460,7 +453,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { public long moveToPrev() throws IOException { if (!this.reverseReader) { - throw new HoodieNotSupportedException("Reverse log reader has not been enabled"); + throw new HoodieNotSupportedException(REVERSE_LOG_READER_HAS_NOT_BEEN_ENABLED); } inputStream.seek(lastReverseLogFilePosition); long blockSize = inputStream.readLong(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 5e7d0806fae..12a80c07a91 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -258,8 +258,7 @@ public interface HoodieLogFormat { // Use rollover write token as write token to create new log file with tokens logWriteToken = rolloverLogWriteToken; } - LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion - + " with write-token " + logWriteToken); + LOG.info("Computed the next log version for {} in {} as {} with write-token {}", logFileId, parentPath, logVersion, logWriteToken); } if (logWriteToken == null) { @@ -279,7 +278,7 @@ public interface HoodieLogFormat { Path logPath = new Path(parentPath, FSUtils.makeLogFileName(logFileId, fileExtension, instantTime, logVersion, logWriteToken)); - LOG.info("HoodieLogFile on path " + logPath); + LOG.info("HoodieLogFile on path {}", logPath); HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen); if (bufferSize == null) { @@ -302,13 +301,11 @@ public interface HoodieLogFormat { static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false); + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE); } - static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, - boolean readBlockLazily, boolean reverseReader) throws IOException { - return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, readBlockLazily, - reverseReader); + static HoodieLogFormat.Reader newReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, boolean reverseReader) throws IOException { + return new HoodieLogFileReader(fs, logFile, readerSchema, HoodieLogFileReader.DEFAULT_BUFFER_SIZE, reverseReader); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java index 3c4737af8d0..f21091e5df0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java @@ -41,27 +41,25 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { private final FileSystem fs; private final Schema readerSchema; private final InternalSchema internalSchema; - private final boolean readBlocksLazily; private final String recordKeyField; private final boolean enableInlineReading; private final int bufferSize; private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatReader.class); - HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily, + HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean reverseLogReader, int bufferSize, boolean enableRecordLookups, String recordKeyField, InternalSchema internalSchema) throws IOException { this.logFiles = logFiles; this.fs = fs; this.readerSchema = readerSchema; - this.readBlocksLazily = readBlocksLazily; this.bufferSize = bufferSize; this.recordKeyField = recordKeyField; this.enableInlineReading = enableRecordLookups; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; - if (logFiles.size() > 0) { + if (!logFiles.isEmpty()) { HoodieLogFile nextLogFile = logFiles.remove(0); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, false, enableRecordLookups, recordKeyField, internalSchema); } } @@ -83,16 +81,16 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader { return false; } else if (currentReader.hasNext()) { return true; - } else if (logFiles.size() > 0) { + } else if (!logFiles.isEmpty()) { try { HoodieLogFile nextLogFile = logFiles.remove(0); this.currentReader.close(); - this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, readBlocksLazily, false, + this.currentReader = new HoodieLogFileReader(fs, nextLogFile, readerSchema, bufferSize, false, enableInlineReading, recordKeyField, internalSchema); } catch (IOException io) { throw new HoodieIOException("unable to initialize read with log file ", io); } - LOG.info("Moving to the next reader for logfile " + currentReader.getLogFile()); + LOG.info("Moving to the next reader for logfile {}", currentReader.getLogFile()); return hasNext(); } return false; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 9062641f1a7..c3cf2f97ab8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -92,7 +92,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader @SuppressWarnings("unchecked") private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, - String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, + String latestInstantTime, Long maxMemorySizeInBytes, boolean reverseReader, int bufferSize, String spillableMapBasePath, Option<InstantRange> instantRange, ExternalSpillableMap.DiskMapType diskMapType, @@ -103,7 +103,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option<HoodieTableMetaClient> hoodieTableMetaClientOption) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); try { @@ -206,12 +206,14 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer(); this.numMergedRecordsInLog = records.size(); - LOG.info("Number of log files scanned => " + logFilePaths.size()); - LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes); - LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); - LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize()); - LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); - LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); + if (LOG.isInfoEnabled()) { + LOG.info("Number of log files scanned => {}", logFilePaths.size()); + LOG.info("MaxMemoryInBytes allowed for compaction => {}", maxMemorySizeInBytes); + LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => {}", records.getInMemoryMapNumEntries()); + LOG.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => {}", records.getCurrentInMemoryMapSize()); + LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => {}", records.getDiskBasedMapNumEntries()); + LOG.info("Size of file spilled to disk => {}", records.getSizeOfFileOnDiskInBytes()); + } } @Override @@ -321,7 +323,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader private Schema readerSchema; private InternalSchema internalSchema = InternalSchema.getEmptyInternalSchema(); private String latestInstantTime; - private boolean readBlocksLazily; private boolean reverseReader; private int bufferSize; // specific configurations @@ -373,12 +374,6 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader return this; } - @Override - public Builder withReadBlocksLazily(boolean readBlocksLazily) { - this.readBlocksLazily = readBlocksLazily; - return this; - } - @Override public Builder withReverseReader(boolean reverseReader) { this.reverseReader = reverseReader; @@ -470,7 +465,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader ValidationUtils.checkArgument(recordMerger != null); return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, + latestInstantTime, maxMemorySizeInBytes, reverseReader, bufferSize, spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan, Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 4d870618e7b..492d6299a0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -43,11 +43,11 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private final LogRecordScannerCallback callback; private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, - String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, + String latestInstantTime, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema, boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger, Option<HoodieTableMetaClient> hoodieTableMetaClientOption) { - super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, + super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, reverseReader, bufferSize, instantRange, false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger, hoodieTableMetaClientOption); this.callback = callback; @@ -104,7 +104,6 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private Schema readerSchema; private InternalSchema internalSchema; private String latestInstantTime; - private boolean readBlocksLazily; private boolean reverseReader; private int bufferSize; private Option<InstantRange> instantRange = Option.empty(); @@ -147,11 +146,6 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade return this; } - public Builder withReadBlocksLazily(boolean readBlocksLazily) { - this.readBlocksLazily = readBlocksLazily; - return this; - } - public Builder withReverseReader(boolean reverseReader) { this.reverseReader = reverseReader; return this; @@ -196,7 +190,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade ValidationUtils.checkArgument(recordMerger != null); return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, + latestInstantTime, reverseReader, bufferSize, callback, instantRange, internalSchema, enableOptimizedLogBlocksScan, recordMerger, Option.ofNullable(hoodieTableMetaClient)); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java index 768085c322c..93383df332f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java @@ -52,7 +52,7 @@ public class LogReaderUtils { private static Schema readSchemaFromLogFileInReverse(FileSystem fs, HoodieActiveTimeline activeTimeline, HoodieLogFile hoodieLogFile) throws IOException { // set length for the HoodieLogFile as it will be leveraged by HoodieLogFormat.Reader with reverseReading enabled - Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true, true); + Reader reader = HoodieLogFormat.newReader(fs, hoodieLogFile, null, true); Schema writerSchema = null; HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); while (reader.hasPrev()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java index 900260b9413..3cd0a9b0da1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java @@ -139,7 +139,6 @@ public class HoodieMetadataLogRecordReader implements Closeable { // NOTE: Merging of Metadata Table's records is currently handled using {@code HoodiePreCombineAvroRecordMerger} // for compatibility purposes; In the future it {@code HoodieMetadataPayload} semantic // will be migrated to its own custom instance of {@code RecordMerger} - .withReadBlocksLazily(true) .withReverseReader(false) .withOperationField(false); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 480ae76a5a1..b25d6741b83 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1801,7 +1801,6 @@ public class HoodieTableMetadataUtil { .withLogFilePaths(logFilePaths) .withReaderSchema(HoodieAvroUtils.getRecordKeySchema()) .withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse("")) - .withReadBlocksLazily(configuration.get().getBoolean("", true)) .withReverseReader(false) .withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)) .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index d4cb5021afc..9e7314cf245 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -440,8 +440,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } writer.close(); - Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), - true, true); + Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema(), true); assertTrue(reader.hasNext(), "We wrote a block, we should be able to read it"); HoodieLogBlock nextBlock = reader.next(); assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next block should be a data block"); @@ -635,7 +634,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { @@ -657,7 +655,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -763,7 +760,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(true) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -783,7 +779,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Generate 3 delta-log files w/ random records @@ -805,7 +800,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -873,7 +867,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Generate 3 delta-log files w/ random records @@ -895,7 +888,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -1158,7 +1150,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1194,7 +1185,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Set<String> originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect(Collectors.toSet()); - checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, Option.of(originalKeys)); } @@ -1202,7 +1193,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1258,7 +1248,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Set<String> originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect(Collectors.toSet()); - checkLogBlocksAndKeys("102", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, Option.of(originalKeys)); } @@ -1327,7 +1317,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { Set<String> originalKeys = copyOfRecords1.stream().map(s -> ((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()) .collect(Collectors.toSet()); - checkLogBlocksAndKeys("103", schema, true, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 200, 200, Option.of(originalKeys)); } @@ -1335,7 +1325,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1393,7 +1382,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("102") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -1441,7 +1429,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("103") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -1476,7 +1463,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1549,7 +1535,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("103") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -1582,8 +1567,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @ParameterizedTest @MethodSource("testArguments") public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType diskMapType, - boolean isCompressionEnabled, - boolean readBlocksLazily) + boolean isCompressionEnabled) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1664,7 +1648,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("104") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -1703,7 +1686,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { @@ -1760,7 +1742,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); writer.close(); - checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, Option.empty()); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); } @@ -1769,7 +1751,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { @@ -1810,7 +1791,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); writer.close(); - checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, Option.empty()); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); } @@ -1819,7 +1800,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1847,7 +1827,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); writer.close(); - checkLogBlocksAndKeys("100", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 100, 100, Option.empty()); } @@ -1855,7 +1835,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { @@ -1900,7 +1879,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); writer.close(); - checkLogBlocksAndKeys("101", schema, readBlocksLazily, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, + checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan, 0, 0, Option.empty()); } @@ -1909,7 +1888,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1990,7 +1968,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("103") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -2057,7 +2034,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { FileCreateUtils.createDeltaCommit(basePath, "101", fs); // Should be able to read all 110 records - checkLogBlocksAndKeys("101", schema, true, ExternalSpillableMap.DiskMapType.BITCASK, false, + checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 110, 110, Option.empty()); // Write a rollback for commit 100 which is not the latest commit @@ -2068,7 +2045,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); // Should only be able to read 10 records from commit 101 - checkLogBlocksAndKeys("101", schema, true, ExternalSpillableMap.DiskMapType.BITCASK, false, + checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 10, 10, Option.empty()); // Write a rollback for commit 101 which is the latest commit @@ -2080,7 +2057,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.close(); // Should not read any records as both commits are rolled back - checkLogBlocksAndKeys("101", schema, true, ExternalSpillableMap.DiskMapType.BITCASK, false, + checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, Option.empty()); } @@ -2088,7 +2065,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { @@ -2171,7 +2147,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer.appendBlock(commandBlock); writer.close(); - checkLogBlocksAndKeys("101", schema, true, ExternalSpillableMap.DiskMapType.BITCASK, false, + checkLogBlocksAndKeys("101", schema, ExternalSpillableMap.DiskMapType.BITCASK, false, false, 0, 0, Option.empty()); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); } @@ -2179,8 +2155,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @ParameterizedTest @MethodSource("testArgumentsWithoutOptimizedScanArg") public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType diskMapType, - boolean isCompressionEnabled, - boolean readBlocksLazily) + boolean isCompressionEnabled) throws IOException, URISyntaxException, InterruptedException { // Write blocks in this manner. @@ -2344,7 +2319,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("108") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -2384,7 +2358,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2, ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) { try { // Write one Data block with same InstantTime (written in same batch) @@ -2433,7 +2406,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime("100") .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) @@ -2454,47 +2426,43 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) { /* * FIRST_ATTEMPT_FAILED: * Original task from the stage attempt failed, but subsequent stage retry succeeded. */ testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, - diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); + diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan); } @ParameterizedTest @MethodSource("testArguments") public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) { /* * SECOND_ATTEMPT_FAILED: * Original task from stage attempt succeeded, but subsequent retry attempt failed. */ testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, - diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); + diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan); } @ParameterizedTest @MethodSource("testArguments") public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean readBlocksLazily, boolean enableOptimizedLogBlocksScan) { /* * BOTH_ATTEMPTS_SUCCEEDED: * Original task from the stage attempt and duplicate task from the stage retry succeeded. */ testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, - diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); + diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testBasicAppendAndReadInReverse(boolean readBlocksLazily) + @Test + public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -2534,7 +2502,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { FileCreateUtils.createDeltaCommit(basePath, "100", fs); HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) { + try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) { assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock prevBlock = reader.prev(); @@ -2568,9 +2536,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAppendAndReadOnCorruptedLogInReverse(boolean readBlocksLazily) + @Test + public void testAppendAndReadOnCorruptedLogInReverse() throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -2615,8 +2582,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { // First round of reads - we should be able to read the first block and then EOF HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); - try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, readBlocksLazily, true)) { + try (HoodieLogFileReader reader = new HoodieLogFileReader(fs, logFile, schema, BUFFER_SIZE, true)) { assertTrue(reader.hasPrev(), "Last block should be available"); HoodieLogBlock block = reader.prev(); @@ -2629,9 +2595,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testBasicAppendAndTraverseInReverse(boolean readBlocksLazily) + @Test + public void testBasicAppendAndTraverseInReverse() throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -2668,7 +2633,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), fs.getFileStatus(writer.getLogFile().getPath()).getLen()); try (HoodieLogFileReader reader = - new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, readBlocksLazily, true)) { + new HoodieLogFileReader(fs, logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) { assertTrue(reader.hasPrev(), "Third block should be available"); reader.moveToPrev(); @@ -2758,7 +2723,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { List<GenericRecord> projectedRecords = HoodieAvroUtils.rewriteRecords(records, projectedSchema); - try (Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), projectedSchema, true, false)) { + try (Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), projectedSchema, false)) { assertTrue(reader.hasNext(), "First block should be available"); HoodieLogBlock nextBlock = reader.next(); @@ -2826,29 +2791,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } private static Stream<Arguments> testArguments() { - // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan - return Stream.of( - arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, true), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, true), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false, true), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false, true), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true, true), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true, true), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true, true), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true, true), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, false), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, false), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false, false), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false, false), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true, false), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true, false), - arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true, false), - arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true, false) - ); - } - - private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() { - // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: enableOptimizedLogBlocksScan return Stream.of( arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false), arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false), @@ -2861,6 +2804,16 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { ); } + private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() { + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled + return Stream.of( + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true) + ); + } + private static Set<HoodieLogFile> writeLogFiles(Path partitionPath, Schema schema, List<IndexedRecord> records, @@ -2970,8 +2923,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { return reader; } - private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, boolean readBlocksLazily, - ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int expectedTotalRecords, + private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, boolean enableOptimizedLogBlocksScan, int expectedTotalRecords, int expectedTotalKeys, Option<Set<String>> expectedKeys) throws IOException { List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", HoodieLogFile.DELTA_EXTENSION, "100") @@ -2984,7 +2937,6 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReaderSchema(schema) .withLatestInstantTime(latestInstantTime) .withMaxMemorySizeInBytes(10240L) - .withReadBlocksLazily(readBlocksLazily) .withReverseReader(false) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableBasePath) diff --git a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java index 7fc93c776f5..6790b602186 100644 --- a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java +++ b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java @@ -353,7 +353,6 @@ public class TestQuickstartData { .withLogFilePaths(logPaths) .withReaderSchema(readSchema) .withLatestInstantTime(instant) - .withReadBlocksLazily(false) .withReverseReader(false) .withBufferSize(16 * 1024 * 1024) .withMaxMemorySizeInBytes(1024 * 1024L) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index ecfc26a10dc..5970dc782b6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -282,7 +282,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven .withReaderSchema(readerSchema) .withLatestInstantTime(instantTime) .withMaxMemorySizeInBytes(maxMemoryPerCompaction) - .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled()) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index baa9f21216b..b10b5be9c47 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -159,7 +159,6 @@ public class FormatUtils { .withReaderSchema(logSchema) .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) @@ -201,10 +200,6 @@ public class FormatUtils { .withReaderSchema(logSchema) .withInternalSchema(internalSchema) .withLatestInstantTime(split.getLatestCommit()) - .withReadBlocksLazily( - string2Boolean( - flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize( flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, @@ -265,7 +260,6 @@ public class FormatUtils { .withLogFilePaths(logPaths) .withReaderSchema(logSchema) .withLatestInstantTime(latestInstantTime) - .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled()) .withReverseReader(false) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge()) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 65c8e82ada1..91e10a3fb9c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -949,7 +949,6 @@ public class TestData { .withLogFilePaths(logPaths) .withReaderSchema(readSchema) .withLatestInstantTime(instant) - .withReadBlocksLazily(false) .withReverseReader(false) .withBufferSize(16 * 1024 * 1024) .withMaxMemorySizeInBytes(1024 * 1024L) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java index 4a39b6548f9..b7ec3b12403 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java @@ -48,8 +48,6 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED; import static org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE; -import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP; -import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED; import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE; import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH; import static org.apache.hudi.hadoop.config.HoodieRealtimeConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN; @@ -185,7 +183,6 @@ public class HoodieMergeOnReadSnapshotReader extends AbstractRealtimeRecordReade .withReaderSchema(readerSchema) .withLatestInstantTime(latestInstantTime) .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf)) - .withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE_PROP, DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH_PROP, DEFAULT_SPILLABLE_MAP_BASE_PATH)) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 61933608e94..5ef1c8d692d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -89,7 +89,6 @@ public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .withReaderSchema(getLogScannerReaderSchema()) .withLatestInstantTime(split.getMaxCommitTime()) .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf)) - .withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java index dd0ef5bf15d..ed40f4dd47c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java @@ -81,7 +81,6 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader .withLogFilePaths(split.getDeltaLogPaths()) .withReaderSchema(getReaderSchema()) .withLatestInstantTime(split.getMaxCommitTime()) - .withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) .withReverseReader(false) .withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 02d534d5b98..edd68ca7baa 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -287,7 +287,6 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { .filterCompletedInstants().lastInstant().get().getTimestamp()) .withMaxMemorySizeInBytes( HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) - .withReadBlocksLazily(true) .withReverseReader(false) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue()) .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath()) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala index 3a86a2cc738..b6a5ae7a956 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala @@ -385,10 +385,6 @@ object LogFileIterator extends SparkAdapterSupport { // NOTE: This part shall only be reached when at least one log is present in the file-group // entailing that table has to have at least one commit .withLatestInstantTime(tableState.latestCommitTimestamp.get) - .withReadBlocksLazily( - Try(hadoopConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, - HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean) - .getOrElse(false)) .withReverseReader(false) .withInternalSchema(internalSchema) .withBufferSize( diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index cca1fd1da0d..fa220acf7b2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -71,7 +71,6 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withLogFilePaths(logFilePaths.asJava) .withReaderSchema(schema) .withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp) - .withReadBlocksLazily(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE.defaultValue)) .withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue)) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue) .withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 8f19ac76287..fa34e24cc5e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -36,6 +37,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieWriteStat; @@ -99,13 +101,6 @@ import java.util.stream.Collectors; import scala.Tuple2; -import static org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER; -import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; -import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; -import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.hadoop.fs.CachingPath.getPathWithoutSchemeAndAuthority; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; @@ -498,11 +493,9 @@ public class HoodieMetadataTableValidator implements Serializable { instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp()); HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, instant); - return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> { - return cleanerFileInfoList.stream().map(fileInfo -> { - return new Path(fileInfo.getFilePath()).getName(); - }); - }); + return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> + cleanerFileInfoList.stream().map(fileInfo -> new Path(fileInfo.getFilePath()).getName()) + ); } catch (IOException e) { throw new HoodieIOException("Error reading cleaner metadata for " + instant); @@ -533,7 +526,7 @@ public class HoodieMetadataTableValidator implements Serializable { engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> { try { validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning); - LOG.info(String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, taskLabels)); + LOG.info("Metadata table validation succeeded for partition {} (partition {})", partitionPath, taskLabels); return Pair.<Boolean, Exception>of(true, null); } catch (HoodieValidationException e) { LOG.error( @@ -569,10 +562,10 @@ public class HoodieMetadataTableValidator implements Serializable { } if (finalResult) { - LOG.info(String.format("Metadata table validation succeeded (%s).", taskLabels)); + LOG.info("Metadata table validation succeeded ({}).", taskLabels); return true; } else { - LOG.warn(String.format("Metadata table validation failed (%s).", taskLabels)); + LOG.warn("Metadata table validation failed ({}).", taskLabels); return false; } } catch (Exception e) { @@ -644,9 +637,9 @@ public class HoodieMetadataTableValidator implements Serializable { if (partitionCreationTimeOpt.isPresent() && !completedTimeline.containsInstant(partitionCreationTimeOpt.get())) { Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); if (lastInstant.isPresent() - && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, lastInstant.get().getTimestamp())) { - LOG.warn("Ignoring additional partition " + partitionFromDMT + ", as it was deduced to be part of a " - + "latest completed commit which was inflight when FS based listing was polled."); + && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), HoodieTimeline.GREATER_THAN, lastInstant.get().getTimestamp())) { + LOG.warn("Ignoring additional partition {}, as it was deduced to be part of a " + + "latest completed commit which was inflight when FS based listing was polled.", partitionFromDMT); actualAdditionalPartitionsInMDT.remove(partitionFromDMT); } } @@ -702,7 +695,7 @@ public class HoodieMetadataTableValidator implements Serializable { Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); return lastInstant.isPresent() && HoodieTimeline.compareTimestamps( - instantTime, LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp()); + instantTime, HoodieTimeline.LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp()); } return true; } else { @@ -782,8 +775,8 @@ public class HoodieMetadataTableValidator implements Serializable { .collect(Collectors.toList()); } - LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath); - LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath); + LOG.debug("All file slices from metadata: {}. For partitions {}", allFileSlicesFromMeta, partitionPath); + LOG.debug("All file slices from direct listing: {}. For partitions {}", allFileSlicesFromFS, partitionPath); validateFileSlices( allFileSlicesFromMeta, allFileSlicesFromFS, partitionPath, fsBasedContext.getMetaClient(), "all file groups"); @@ -809,8 +802,8 @@ public class HoodieMetadataTableValidator implements Serializable { latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath); } - LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath); - LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath); + LOG.debug("Latest base file from metadata: {}. For partitions {}", latestFilesFromMetadata, partitionPath); + LOG.debug("Latest base file from direct listing: {}. For partitions {}", latestFilesFromFS, partitionPath); validate(latestFilesFromMetadata, latestFilesFromFS, partitionPath, "latest base files"); } @@ -834,8 +827,8 @@ public class HoodieMetadataTableValidator implements Serializable { latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath); } - LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath); - LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath); + LOG.debug("Latest file list from metadata: {}. For partition {}", latestFileSlicesFromMetadataTable, partitionPath); + LOG.debug("Latest file list from direct listing: {}. For partition {}", latestFileSlicesFromFS, partitionPath); validateFileSlices( latestFileSlicesFromMetadataTable, latestFileSlicesFromFS, partitionPath, @@ -906,7 +899,7 @@ public class HoodieMetadataTableValidator implements Serializable { String basePath = metaClient.getBasePathV2().toString(); long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi") .load(basePath) - .select(RECORD_KEY_METADATA_FIELD) + .select(HoodieRecord.RECORD_KEY_METADATA_FIELD) .count(); long countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi") .load(getMetadataTableBasePath(basePath)) @@ -915,14 +908,12 @@ public class HoodieMetadataTableValidator implements Serializable { .count(); if (countKeyFromTable != countKeyFromRecordIndex) { - String message = String.format("Validation of record index count failed: " - + "%s entries from record index metadata, %s keys from the data table: " + cfg.basePath, - countKeyFromRecordIndex, countKeyFromTable); + String message = String.format("Validation of record index count failed: %s entries from record index metadata, %s keys from the data table: %s", + countKeyFromRecordIndex, countKeyFromTable, cfg.basePath); LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format( - "Validation of record index count succeeded: %s entries. Table: %s", countKeyFromRecordIndex, cfg.basePath)); + LOG.info("Validation of record index count succeeded: {} entries. Table: {}", countKeyFromRecordIndex, cfg.basePath); } } @@ -932,11 +923,11 @@ public class HoodieMetadataTableValidator implements Serializable { String basePath = metaClient.getBasePathV2().toString(); JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd = sparkEngineContext.getSqlContext().read().format("hudi").load(basePath) - .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD) + .select(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD) .toJavaRDD() - .mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)), - Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)), - FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD)))))) + .mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)), + Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)), + FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)))))) .cache(); JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd = @@ -970,7 +961,6 @@ public class HoodieMetadataTableValidator implements Serializable { .map(e -> { Optional<Pair<String, String>> locationOnFs = e._2._1; Optional<Pair<String, String>> locationFromRecordIndex = e._2._2; - StringBuilder sb = new StringBuilder(); List<String> errorSampleList = new ArrayList<>(); if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) { if (locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft()) @@ -1036,8 +1026,7 @@ public class HoodieMetadataTableValidator implements Serializable { LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format( - "Validation of record index content succeeded: %s entries. Table: %s", countKey, cfg.basePath)); + LOG.info("Validation of record index content succeeded: {} entries. Table: {}", countKey, cfg.basePath); } } @@ -1082,7 +1071,7 @@ public class HoodieMetadataTableValidator implements Serializable { LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format("Validation of %s succeeded for partition %s for table: %s", label, partitionPath, cfg.basePath)); + LOG.info("Validation of {} succeeded for partition {} for table: {}", label, partitionPath, cfg.basePath); } } @@ -1109,8 +1098,7 @@ public class HoodieMetadataTableValidator implements Serializable { mismatch = true; break; } else { - LOG.warn(String.format("There are uncommitted log files in the latest file slices " - + "but the committed log files match: %s %s", fileSlice1, fileSlice2)); + LOG.warn("There are uncommitted log files in the latest file slices but the committed log files match: {} {}", fileSlice1, fileSlice2); } } } @@ -1122,7 +1110,7 @@ public class HoodieMetadataTableValidator implements Serializable { LOG.error(message); throw new HoodieValidationException(message); } else { - LOG.info(String.format("Validation of %s succeeded for partition %s for table: %s ", label, partitionPath, cfg.basePath)); + LOG.info("Validation of {} succeeded for partition {} for table: {}", label, partitionPath, cfg.basePath); } } @@ -1154,13 +1142,11 @@ public class HoodieMetadataTableValidator implements Serializable { FileSystem fileSystem = metaClient.getFs(); if (hasCommittedLogFiles(fileSystem, fs1LogPathSet, metaClient, committedFilesMap)) { - LOG.error("The first file slice has committed log files that cause mismatching: " + fs1 - + "; Different log files are: " + fs1LogPathSet); + LOG.error("The first file slice has committed log files that cause mismatching: {}; Different log files are: {}", fs1, fs1LogPathSet); return false; } if (hasCommittedLogFiles(fileSystem, fs2LogPathSet, metaClient, committedFilesMap)) { - LOG.error("The second file slice has committed log files that cause mismatching: " + fs2 - + "; Different log files are: " + fs2LogPathSet); + LOG.error("The second file slice has committed log files that cause mismatching: {}; Different log files are: {}", fs2, fs2LogPathSet); return false; } return true; @@ -1187,17 +1173,16 @@ public class HoodieMetadataTableValidator implements Serializable { MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, new Path(logFilePathStr)); if (messageType == null) { - LOG.warn(String.format("Cannot read schema from log file %s. " - + "Skip the check as it's likely being written by an inflight instant.", logFilePathStr)); + LOG.warn("Cannot read schema from log file {}. Skip the check as it's likely being written by an inflight instant.", logFilePathStr); continue; } Schema readerSchema = converter.convert(messageType); reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema, true, false); + HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema, false); // read the avro blocks if (reader.hasNext()) { HoodieLogBlock block = reader.next(); - final String instantTime = block.getLogBlockHeader().get(INSTANT_TIME); + final String instantTime = block.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME); if (completedInstantsTimeline.containsInstant(instantTime)) { // The instant is completed, in active timeline // Checking commit metadata only as log files can only be written by COMMIT or DELTA_COMMIT @@ -1225,36 +1210,30 @@ public class HoodieMetadataTableValidator implements Serializable { // behavior. String relativeLogFilePathStr = getRelativePath(basePath, logFilePathStr); if (committedFilesMap.get(instantTime).contains(relativeLogFilePathStr)) { - LOG.warn("Log file is committed in an instant in active timeline: instantTime=" - + instantTime + " " + logFilePathStr); + LOG.warn("Log file is committed in an instant in active timeline: instantTime={} {}", instantTime, logFilePathStr); return true; } else { - LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: " - + "instantTime=" + instantTime + " " + logFilePathStr); + LOG.warn("Log file is uncommitted in a completed instant, likely due to retry: instantTime={} {}", instantTime, logFilePathStr); } } else if (completedInstantsTimeline.isBeforeTimelineStarts(instantTime)) { // The instant is in archived timeline - LOG.warn("Log file is committed in an instant in archived timeline: instantTime=" - + instantTime + " " + logFilePathStr); + LOG.warn("Log file is committed in an instant in archived timeline: instantTime={} {}", instantTime, logFilePathStr); return true; } else if (inflightInstantsTimeline.containsInstant(instantTime)) { // The instant is inflight in active timeline // hit an uncommitted block possibly from a failed write - LOG.warn("Log file is uncommitted because of an inflight instant: instantTime=" - + instantTime + " " + logFilePathStr); + LOG.warn("Log file is uncommitted because of an inflight instant: instantTime={} {}", instantTime, logFilePathStr); } else { // The instant is after the start of the active timeline, // but it cannot be found in the active timeline - LOG.warn("Log file is uncommitted because the instant is after the start of the " - + "active timeline but absent or in requested in the active timeline: instantTime=" - + instantTime + " " + logFilePathStr); + LOG.warn("Log file is uncommitted because the instant is after the start of the active timeline but absent or in requested in the active timeline: instantTime={} {}", + instantTime, logFilePathStr); } } else { - LOG.warn("There is no log block in " + logFilePathStr); + LOG.warn("There is no log block in {}", logFilePathStr); } } catch (IOException e) { - LOG.warn(String.format("Cannot read log file %s: %s. " - + "Skip the check as it's likely being written by an inflight instant.", + LOG.warn(String.format("Cannot read log file %s: %s. Skip the check as it's likely being written by an inflight instant.", logFilePathStr, e.getMessage()), e); } finally { FileIOUtils.closeQuietly(reader); @@ -1289,8 +1268,7 @@ public class HoodieMetadataTableValidator implements Serializable { long toSleepMs = cfg.minValidateIntervalSeconds * 1000 - (System.currentTimeMillis() - start); if (toSleepMs > 0) { - LOG.info("Last validate ran less than min validate interval: " + cfg.minValidateIntervalSeconds + " s, sleep: " - + toSleepMs + " ms."); + LOG.info("Last validate ran less than min validate interval: {} s, sleep: {} ms.", cfg.minValidateIntervalSeconds, toSleepMs); Thread.sleep(toSleepMs); } } catch (HoodieValidationException e) { @@ -1376,7 +1354,7 @@ public class HoodieMetadataTableValidator implements Serializable { .build(); this.fileSystemView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, metaClient, metadataConfig); - this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePath()); + this.tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, metaClient.getBasePathV2().toString()); if (metaClient.getCommitsTimeline().filterCompletedInstants().countInstants() > 0) { this.allColumnNameList = getAllColumnNames(); } @@ -1408,7 +1386,7 @@ public class HoodieMetadataTableValidator implements Serializable { @SuppressWarnings({"rawtypes", "unchecked"}) public List<HoodieColumnRangeMetadata<Comparable>> getSortedColumnStatsList( String partitionPath, List<String> baseFileNameList) { - LOG.info("All column names for getting column stats: " + allColumnNameList); + LOG.info("All column names for getting column stats: {}", allColumnNameList); if (enableMetadataTable) { List<Pair<String, String>> partitionFileNameList = baseFileNameList.stream() .map(filename -> Pair.of(partitionPath, filename)).collect(Collectors.toList()); @@ -1424,7 +1402,7 @@ public class HoodieMetadataTableValidator implements Serializable { return baseFileNameList.stream().flatMap(filename -> new ParquetUtils().readRangeFromParquetMetadata( metaClient.getHadoopConf(), - new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath), filename), + new Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), filename), allColumnNameList).stream()) .sorted(new HoodieColumnRangeMetadataComparator()) .collect(Collectors.toList()); @@ -1460,7 +1438,7 @@ public class HoodieMetadataTableValidator implements Serializable { return schemaResolver.getTableAvroSchema().getFields().stream() .map(Schema.Field::name).collect(Collectors.toList()); } catch (Exception e) { - throw new HoodieException("Failed to get all column names for " + metaClient.getBasePath()); + throw new HoodieException("Failed to get all column names for " + metaClient.getBasePathV2()); } } @@ -1468,17 +1446,17 @@ public class HoodieMetadataTableValidator implements Serializable { Path path = new Path(FSUtils.getPartitionPath(metaClient.getBasePathV2(), partitionPath), filename); BloomFilter bloomFilter; HoodieConfig hoodieConfig = new HoodieConfig(); - hoodieConfig.setValue(USE_NATIVE_HFILE_READER, - Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, USE_NATIVE_HFILE_READER))); + hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER, + Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props, HoodieReaderConfig.USE_NATIVE_HFILE_READER))); try (HoodieFileReader fileReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(hoodieConfig, metaClient.getHadoopConf(), path)) { bloomFilter = fileReader.readBloomFilter(); if (bloomFilter == null) { - LOG.error("Failed to read bloom filter for " + path); + LOG.error("Failed to read bloom filter for {}", path); return Option.empty(); } } catch (IOException e) { - LOG.error("Failed to get file reader for " + path + " " + e.getMessage()); + LOG.error("Failed to get file reader for {} {}", path, e.getMessage()); return Option.empty(); } return Option.of(BloomFilterData.builder()
