This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 69ee790 [HUDI-1294] Adding inline read and seek based read(batch get)
for hfile log blocks in metadata table (#3762)
69ee790 is described below
commit 69ee790a47a5fa90a6acd954a9330cce3ae31c3b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Oct 29 12:12:44 2021 -0400
[HUDI-1294] Adding inline read and seek based read(batch get) for hfile log
blocks in metadata table (#3762)
---
.../io/storage/TestHoodieHFileReaderWriter.java | 134 +++++++++++++++++++++
.../functional/TestHoodieBackedMetadata.java | 38 +++---
.../client/functional/TestHoodieMetadataBase.java | 16 ++-
.../hudi/common/config/HoodieMetadataConfig.java | 23 ++++
...ner.java => AbstractHoodieLogRecordReader.java} | 99 +++++++++------
.../hudi/common/table/log/HoodieLogFileReader.java | 9 +-
.../common/table/log/HoodieLogFormatReader.java | 9 +-
.../table/log/HoodieMergedLogRecordScanner.java | 11 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 4 +-
.../common/table/log/block/HoodieDataBlock.java | 13 +-
.../table/log/block/HoodieHFileDataBlock.java | 53 +++++++-
.../apache/hudi/io/storage/HoodieHFileReader.java | 34 +++++-
.../apache/hudi/metadata/BaseTableMetadata.java | 62 ++++++++--
.../hudi/metadata/HoodieBackedTableMetadata.java | 122 +++++++++++++------
...va => HoodieMetadataMergedLogRecordReader.java} | 55 +++++++--
.../realtime/TestHoodieRealtimeRecordReader.java | 21 +++-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 14 ++-
17 files changed, 583 insertions(+), 134 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
new file mode 100644
index 0000000..0492063
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieHFileReaderWriter {
+ @TempDir File tempDir;
+ private Path filePath;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ filePath = new Path(tempDir.toString() + "tempFile.txt");
+ }
+
+ @AfterEach
+ public void clearTempFile() {
+ File file = new File(filePath.toString());
+ if (file.exists()) {
+ file.delete();
+ }
+ }
+
+ private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws
Exception {
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001,
-1, BloomFilterTypeCode.SIMPLE.name());
+ Configuration conf = new Configuration();
+ TaskContextSupplier mockTaskContextSupplier =
Mockito.mock(TaskContextSupplier.class);
+ String instantTime = "000";
+
+ HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf,
Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
+ filter);
+ return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig,
avroSchema, mockTaskContextSupplier);
+ }
+
+ @Test
+ public void testWriteReadHFile() throws Exception {
+ Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class,
"/exampleSchema.avsc");
+ HoodieHFileWriter writer = createHFileWriter(avroSchema);
+ List<String> keys = new ArrayList<>();
+ Map<String, GenericRecord> recordMap = new HashMap<>();
+ for (int i = 0; i < 100; i++) {
+ GenericRecord record = new GenericData.Record(avroSchema);
+ String key = String.format("%s%04d", "key", i);
+ record.put("_row_key", key);
+ keys.add(key);
+ record.put("time", Integer.toString(RANDOM.nextInt()));
+ record.put("number", i);
+ writer.writeAvro(key, record);
+ recordMap.put(key, record);
+ }
+ writer.close();
+
+ Configuration conf = new Configuration();
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf,
filePath, cacheConfig, filePath.getFileSystem(conf));
+ List<Pair<String, IndexedRecord>> records =
hoodieHFileReader.readAllRecords();
+ records.forEach(entry -> assertEquals(entry.getSecond(),
recordMap.get(entry.getFirst())));
+ hoodieHFileReader.close();
+
+ for (int i = 0; i < 20; i++) {
+ int randomRowstoFetch = 5 + RANDOM.nextInt(50);
+ Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
+ List<String> rowsList = new ArrayList<>(rowsToFetch);
+ Collections.sort(rowsList);
+ hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig,
filePath.getFileSystem(conf));
+ List<Pair<String, GenericRecord>> result =
hoodieHFileReader.readRecords(rowsList);
+ assertEquals(result.size(), randomRowstoFetch);
+ result.forEach(entry -> {
+ assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
+ });
+ hoodieHFileReader.close();
+ }
+ }
+
+ private Set<String> getRandomKeys(int count, List<String> keys) {
+ Set<String> rowKeys = new HashSet<>();
+ int totalKeys = keys.size();
+ while (rowKeys.size() < count) {
+ int index = RANDOM.nextInt(totalKeys);
+ if (!rowKeys.contains(index)) {
+ rowKeys.add(keys.get(index));
+ }
+ }
+ return rowKeys;
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 7ea9766..e0c61e1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -160,9 +160,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
doRollbackAndValidate(testTable, "0000003", "0000004");
}
- doWriteOperationAndValidate(testTable, "0000005");
-
- // trigger an upsert and validate
+ // trigger couple of upserts
+ doWriteOperation(testTable, "0000005");
doWriteOperation(testTable, "0000006");
validateMetadata(testTable, true);
}
@@ -222,9 +221,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
- @EnumSource(HoodieTableType.class)
- public void testTableOperations(HoodieTableType tableType) throws Exception {
- init(tableType);
+ @MethodSource("bootstrapAndTableOperationTestArgs")
+ public void testTableOperations(HoodieTableType tableType, boolean
enableFullScan) throws Exception {
+ init(tableType, true, enableFullScan);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
@@ -236,7 +235,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
// trigger an upsert
- doWriteOperationAndValidate(testTable, "0000005");
+ doWriteOperation(testTable, "0000005");
// trigger clean
doCleanAndValidate(testTable, "0000006", singletonList("0000001"));
@@ -255,7 +254,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
doWriteOperation(testTable, "0000002");
doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001"));
if (tableType == MERGE_ON_READ) {
- doCompactionAndValidate(testTable, "0000004");
+ doCompaction(testTable, "0000004");
}
doWriteOperation(testTable, "0000005");
validateMetadata(testTable, emptyList(), true);
@@ -288,7 +287,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
doWriteOperationAndValidate(testTable, "0000003");
// trigger a commit and rollback
- doWriteOperationAndValidate(testTable, "0000004");
+ doWriteOperation(testTable, "0000004");
doRollbackAndValidate(testTable, "0000004", "0000005");
// trigger few upserts and validate
@@ -297,7 +296,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
validateMetadata(testTable);
- doWriteOperationAndValidate(testTable, "0000010");
+ doWriteOperation(testTable, "0000010");
// rollback last commit. and validate.
doRollbackAndValidate(testTable, "0000010", "0000011");
@@ -309,7 +308,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
// roll back of delete
- doWriteOperationAndValidate(testTable, "0000014", DELETE);
+ doWriteOperation(testTable, "0000014", DELETE);
doRollbackAndValidate(testTable, "0000014", "0000015");
// rollback partial commit
@@ -394,9 +393,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
syncTableMetadata(writeConfig);
validateMetadata(testTable);
- doWriteOperationAndValidate(testTable, "00000003", INSERT);
- doWriteOperationAndValidate(testTable, "00000004", UPSERT);
- doWriteOperationAndValidate(testTable, "00000005", UPSERT);
+ doWriteOperation(testTable, "00000003", INSERT);
+ doWriteOperation(testTable, "00000004", UPSERT);
+ doWriteOperation(testTable, "00000005", UPSERT);
// trigger compaction
if (MERGE_ON_READ.equals(tableType)) {
@@ -404,13 +403,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
}
// trigger an upsert
- doWriteOperationAndValidate(testTable, "00000008");
+ doWriteOperation(testTable, "00000008");
// trigger delete
- doWriteOperationAndValidate(testTable, "00000009", DELETE);
+ doWriteOperation(testTable, "00000009", DELETE);
// trigger clean
doCleanAndValidate(testTable, "00000010", asList("00000003", "00000004"));
// trigger another upsert
- doWriteOperationAndValidate(testTable, "00000011");
+ doWriteOperation(testTable, "00000011");
// trigger clustering
doClusterAndValidate(testTable, "00000012");
@@ -528,7 +527,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUniqueUpdates(newCommitTime, 10);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
// Write 4 (updates and inserts)
newCommitTime = "0000004";
@@ -552,7 +550,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
records = dataGen.generateUpdates(newCommitTime, 5);
writeStatuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
- validateMetadata(client);
// Compaction
if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
@@ -568,7 +565,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r ->
r.getKey());
client.startCommitWithTime(newCommitTime);
client.delete(deleteKeys, newCommitTime);
- validateMetadata(client);
// Clean
newCommitTime = "0000009";
@@ -1128,7 +1124,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
Collections.sort(fsFileNames);
Collections.sort(metadataFilenames);
- assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/"
+ partition).length);
+ assertEquals(fsStatuses.length,
partitionToFilesMap.get(partitionPath.toString()).length);
// File sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 85f869f..7a49daf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -72,6 +72,10 @@ public class TestHoodieMetadataBase extends
HoodieClientTestHarness {
}
public void init(HoodieTableType tableType, boolean enableMetadataTable)
throws IOException {
+ init(tableType, enableMetadataTable, true);
+ }
+
+ public void init(HoodieTableType tableType, boolean enableMetadataTable,
boolean enableFullScan) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
@@ -80,7 +84,8 @@ public class TestHoodieMetadataBase extends
HoodieClientTestHarness {
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath =
HoodieTableMetadata.getMetadataTableBasePath(basePath);
- writeConfig = getWriteConfig(true, enableMetadataTable);
+ writeConfig =
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true,
enableMetadataTable, false,
+ enableFullScan).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}
@@ -256,7 +261,13 @@ public class TestHoodieMetadataBase extends
HoodieClientTestHarness {
return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER,
autoCommit, useFileListingMetadata, enableMetrics);
}
- protected HoodieWriteConfig.Builder
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean
autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
+ protected HoodieWriteConfig.Builder
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean
autoCommit, boolean useFileListingMetadata,
+ boolean
enableMetrics) {
+ return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata,
enableMetrics, true);
+ }
+
+ protected HoodieWriteConfig.Builder
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean
autoCommit, boolean useFileListingMetadata,
+ boolean
enableMetrics, boolean enableFullScan) {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2,
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
@@ -271,6 +282,7 @@ public class TestHoodieMetadataBase extends
HoodieClientTestHarness {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
+ .enableFullScan(enableFullScan)
.enableMetrics(enableMetrics).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index b74a17c..d526294 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -115,6 +115,20 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Parallelism to use, when listing the table on lake
storage.");
+ public static final ConfigProperty<Boolean> ENABLE_INLINE_READING =
ConfigProperty
+ .key(METADATA_PREFIX + ".enable.inline.reading")
+ .defaultValue(true)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Enable inline reading of Log files. By default log
block contents are read as byte[] using regular input stream and records "
+ + "are deserialized from it. Enabling this will read each log block
as an inline file and read records from the same. For instance, "
+ + "for HFileDataBlock, a inline file will be read using
HFileReader.");
+
+ public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES =
ConfigProperty
+ .key(METADATA_PREFIX + ".enable.full.scan.log.files")
+ .defaultValue(true)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Enable full scanning of log files while reading log
records. If disabled, hudi does look up of only interested entries.");
+
private HoodieMetadataConfig() {
super();
}
@@ -143,6 +157,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getString(DIR_FILTER_REGEX);
}
+ public boolean enableFullScan() {
+ return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
+ }
+
public static class Builder {
private final HoodieMetadataConfig metadataConfig = new
HoodieMetadataConfig();
@@ -210,6 +228,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder enableFullScan(boolean enableFullScan) {
+ metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES,
String.valueOf(enableFullScan));
+ return this;
+ }
+
public HoodieMetadataConfig build() {
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
return metadataConfig;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
similarity index 83%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 868c7cb..e2e76ad 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -47,6 +47,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
@@ -71,9 +72,9 @@ import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
* <p>
* This results in two I/O passes over the log file.
*/
-public abstract class AbstractHoodieLogRecordScanner {
+public abstract class AbstractHoodieLogRecordReader {
- private static final Logger LOG =
LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
+ private static final Logger LOG =
LogManager.getLogger(AbstractHoodieLogRecordReader.class);
// Reader schema for the records
protected final Schema readerSchema;
@@ -114,12 +115,23 @@ public abstract class AbstractHoodieLogRecordScanner {
private AtomicLong totalCorruptBlocks = new AtomicLong(0);
// Store the last instant log blocks (needed to implement rollback)
private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
+ // Enables full scan of log records
+ protected final boolean enableFullScan;
+ private int totalScannedLogFiles;
// Progress
private float progress = 0.0f;
- protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
- String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader,
- int bufferSize,
Option<InstantRange> instantRange, boolean withOperationField) {
+ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader,
+ int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField) {
+ this(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
+ true);
+ }
+
+ protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
+ String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader,
+ int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField,
+ boolean enableFullScan) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -132,18 +144,27 @@ public abstract class AbstractHoodieLogRecordScanner {
}
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
- this.readBlocksLazily = readBlocksLazily;
this.reverseReader = reverseReader;
+ this.readBlocksLazily = readBlocksLazily;
this.fs = fs;
this.bufferSize = bufferSize;
this.instantRange = instantRange;
this.withOperationField = withOperationField;
+ this.enableFullScan = enableFullScan;
}
- /**
- * Scan Log files.
- */
public void scan() {
+ scan(Option.empty());
+ }
+
+ public void scan(Option<List<String>> keys) {
+ currentInstantLogBlocks = new ArrayDeque<>();
+ progress = 0.0f;
+ totalLogFiles = new AtomicLong(0);
+ totalRollbacks = new AtomicLong(0);
+ totalCorruptBlocks = new AtomicLong(0);
+ totalLogBlocks = new AtomicLong(0);
+ totalLogRecords = new AtomicLong(0);
HoodieLogFormatReader logFormatReaderWrapper = null;
HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
@@ -152,7 +173,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// iterate over the paths
logFormatReaderWrapper = new HoodieLogFormatReader(fs,
logFilePaths.stream().map(logFile -> new HoodieLogFile(new
Path(logFile))).collect(Collectors.toList()),
- readerSchema, readBlocksLazily, reverseReader, bufferSize);
+ readerSchema, readBlocksLazily, reverseReader, bufferSize,
!enableFullScan);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
while (logFormatReaderWrapper.hasNext()) {
HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -160,16 +181,16 @@ public abstract class AbstractHoodieLogRecordScanner {
scannedLogFiles.add(logFile);
totalLogFiles.set(scannedLogFiles.size());
// Use the HoodieLogFileReader to iterate through the blocks in the
log file
- HoodieLogBlock r = logFormatReaderWrapper.next();
- final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME);
+ HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+ final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
totalLogBlocks.incrementAndGet();
- if (r.getBlockType() != CORRUPT_BLOCK
- &&
!HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
+ if (logBlock.getBlockType() != CORRUPT_BLOCK
+ &&
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed,
stop processing further
break;
}
- if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() !=
COMMAND_BLOCK) {
+ if (logBlock.getBlockType() != CORRUPT_BLOCK &&
logBlock.getBlockType() != COMMAND_BLOCK) {
if
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
|| inflightInstantsTimeline.containsInstant(instantTime)) {
// hit an uncommitted block possibly from a failed write, move to
the next one and skip processing this one
@@ -180,28 +201,28 @@ public abstract class AbstractHoodieLogRecordScanner {
continue;
}
}
- switch (r.getBlockType()) {
+ switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
LOG.info("Reading a data block from file " + logFile.getPath() + "
at instant "
- + r.getLogBlockHeader().get(INSTANT_TIME));
- if (isNewInstantBlock(r) && !readBlocksLazily) {
+ + logBlock.getLogBlockHeader().get(INSTANT_TIME));
+ if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is an avro data block belonging to a different
commit/instant,
// then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size());
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keys);
}
// store the current block
- currentInstantLogBlocks.push(r);
+ currentInstantLogBlocks.push(logBlock);
break;
case DELETE_BLOCK:
LOG.info("Reading a delete block from file " + logFile.getPath());
- if (isNewInstantBlock(r) && !readBlocksLazily) {
+ if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
// If this is a delete data block belonging to a different
commit/instant,
// then merge the last blocks and records into the main result
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size());
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keys);
}
// store deletes so can be rolled back
- currentInstantLogBlocks.push(r);
+ currentInstantLogBlocks.push(logBlock);
break;
case COMMAND_BLOCK:
// Consider the following scenario
@@ -218,9 +239,9 @@ public abstract class AbstractHoodieLogRecordScanner {
// both B1 & B2
LOG.info("Reading a command block from file " + logFile.getPath());
// This is a command block - take appropriate action based on the
command
- HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
+ HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
String targetInstantForCommandBlock =
-
r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
+
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
switch (commandBlock.getType()) { // there can be different types
of command blocks
case ROLLBACK_PREVIOUS_BLOCK:
// Rollback the last read log block
@@ -264,7 +285,7 @@ public abstract class AbstractHoodieLogRecordScanner {
LOG.info("Found a corrupt block in " + logFile.getPath());
totalCorruptBlocks.incrementAndGet();
// If there is a corrupt block - we will assume that this was the
next data block
- currentInstantLogBlocks.push(r);
+ currentInstantLogBlocks.push(logBlock);
break;
default:
throw new UnsupportedOperationException("Block type not supported
yet");
@@ -273,7 +294,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty()) {
LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size());
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keys);
}
// Done
progress = 1.0f;
@@ -308,9 +329,14 @@ public abstract class AbstractHoodieLogRecordScanner {
* Iterate over the GenericRecord in the block, read the hoodie key and
partition path and call subclass processors to
* handle it.
*/
- private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
+ private void processDataBlock(HoodieDataBlock dataBlock,
Option<List<String>> keys) throws Exception {
// TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use
that here
- List<IndexedRecord> recs = dataBlock.getRecords();
+ List<IndexedRecord> recs = new ArrayList<>();
+ if (!keys.isPresent()) {
+ recs = dataBlock.getRecords();
+ } else {
+ recs = dataBlock.getRecords(keys.get());
+ }
totalLogRecords.addAndGet(recs.size());
for (IndexedRecord rec : recs) {
processNextRecord(createHoodieRecord(rec));
@@ -342,17 +368,18 @@ public abstract class AbstractHoodieLogRecordScanner {
/**
* Process the set of log blocks belonging to the last instant which is read
fully.
*/
- private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks,
int numLogFilesSeen) throws Exception {
- while (!lastBlocks.isEmpty()) {
- LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
+ private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks,
int numLogFilesSeen,
+ Option<List<String>> keys) throws
Exception {
+ while (!logBlocks.isEmpty()) {
+ LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
// poll the element at the bottom of the stack since that's the order it
was inserted
- HoodieLogBlock lastBlock = lastBlocks.pollLast();
+ HoodieLogBlock lastBlock = logBlocks.pollLast();
switch (lastBlock.getBlockType()) {
case AVRO_DATA_BLOCK:
- processDataBlock((HoodieAvroDataBlock) lastBlock);
+ processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
break;
case HFILE_DATA_BLOCK:
- processDataBlock((HoodieHFileDataBlock) lastBlock);
+ processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
break;
case DELETE_BLOCK:
Arrays.stream(((HoodieDeleteBlock)
lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
@@ -432,6 +459,6 @@ public abstract class AbstractHoodieLogRecordScanner {
throw new UnsupportedOperationException();
}
- public abstract AbstractHoodieLogRecordScanner build();
+ public abstract AbstractHoodieLogRecordReader build();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index f0f3842..cdf3065 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -70,17 +70,24 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private long reverseLogFilePosition;
private long lastReverseLogFilePosition;
private boolean reverseReader;
+ private boolean enableInlineReading;
private boolean closed = false;
private transient Thread shutdownThread = null;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader)
throws IOException {
+ this(fs, logFile, readerSchema, bufferSize, readBlockLazily,
reverseReader, false);
+ }
+
+ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
+ boolean readBlockLazily, boolean reverseReader,
boolean enableInlineReading) throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
this.logFile = logFile;
this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
+ this.enableInlineReading = enableInlineReading;
if (this.reverseReader) {
this.reverseLogFilePosition = this.lastReverseLogFilePosition =
fs.getFileStatus(logFile.getPath()).getLen();
}
@@ -248,7 +255,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
}
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
- contentPosition, contentLength, blockEndPos, readerSchema,
header, footer);
+ contentPosition, contentLength, blockEndPos, readerSchema,
header, footer, enableInlineReading);
case DELETE_BLOCK:
return HoodieDeleteBlock.getBlock(logFile, inputStream,
Option.ofNullable(content), readBlockLazily,
contentPosition, contentLength, blockEndPos, header, footer);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 7267227..36fa187 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -49,7 +49,12 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
private static final Logger LOG =
LogManager.getLogger(HoodieLogFormatReader.class);
HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema
readerSchema, boolean readBlocksLazily,
- boolean reverseLogReader, int bufferSize) throws IOException {
+ boolean reverseLogReader, int bufferSize) throws
IOException {
+ this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader,
bufferSize, false);
+ }
+
+ HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema
readerSchema, boolean readBlocksLazily,
+ boolean reverseLogReader, int bufferSize, boolean enableInlineReading)
throws IOException {
this.logFiles = logFiles;
this.fs = fs;
this.readerSchema = readerSchema;
@@ -59,7 +64,7 @@ public class HoodieLogFormatReader implements
HoodieLogFormat.Reader {
this.prevReadersInOpenState = new ArrayList<>();
if (logFiles.size() > 0) {
HoodieLogFile nextLogFile = logFiles.remove(0);
- this.currentReader = new HoodieLogFileReader(fs, nextLogFile,
readerSchema, bufferSize, readBlocksLazily, false);
+ this.currentReader = new HoodieLogFileReader(fs, nextLogFile,
readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 18b2672..a8d97ac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -54,7 +54,7 @@ import java.util.Map;
* This results in two I/O passes over the log file.
*/
-public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
private static final Logger LOG =
LogManager.getLogger(HoodieMergedLogRecordScanner.class);
@@ -77,8 +77,9 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
boolean reverseReader, int
bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange,
boolean autoScan,
ExternalSpillableMap.DiskMapType
diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
- boolean withOperationField) {
- super(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField);
+ boolean withOperationField, boolean
enableFullScan) {
+ super(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
+ enableFullScan);
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
@@ -166,7 +167,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
- public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
protected FileSystem fs;
protected String basePath;
protected List<String> logFilePaths;
@@ -276,7 +277,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths,
readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily,
reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan,
- diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField);
+ diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField,
true);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8b26f72..f781a14 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -31,7 +31,7 @@ import java.util.List;
/**
* A scanner used to scan hoodie unmerged log records.
*/
-public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner {
+public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReader {
private final LogRecordScannerCallback callback;
@@ -72,7 +72,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
/**
* Builder used to build {@code HoodieUnMergedLogRecordScanner}.
*/
- public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+ public static class Builder extends AbstractHoodieLogRecordReader.Builder {
private FileSystem fs;
private String basePath;
private List<String> logFilePaths;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 8f5b741..2e4338e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -111,6 +111,17 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
return records;
}
+ /**
+ * Batch get of keys of interest. Implementation can choose to either do
full scan and return matched entries or
+ * do a seek based parsing and return matched entries.
+ * @param keys keys of interest.
+ * @return List of IndexedRecords for the keys of interest.
+ * @throws IOException
+ */
+ public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+ throw new UnsupportedOperationException("On demand batch get based on
interested keys not supported");
+ }
+
public Schema getSchema() {
// if getSchema was invoked before converting byte [] to records
if (records == null) {
@@ -119,7 +130,7 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
return schema;
}
- private void createRecordsFromContentBytes() throws IOException {
+ protected void createRecordsFromContentBytes() throws IOException {
if (readBlockLazily && !getContent().isPresent()) {
// read log block contents from disk
inflate();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 6d2682a..a1e0c12 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -19,12 +19,16 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.HoodieHFileReader;
+
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.util.Pair;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -61,6 +66,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
private static final Logger LOG =
LogManager.getLogger(HoodieHFileDataBlock.class);
private static Compression.Algorithm compressionAlgorithm =
Compression.Algorithm.GZ;
private static int blockSize = 1 * 1024 * 1024;
+ private boolean enableInlineReading = false;
public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String>
logBlockHeader,
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@@ -71,10 +77,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream
inputStream, Option<byte[]> content,
boolean readBlockLazily, long position, long blockSize, long
blockEndpos, Schema readerSchema,
- Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String>
footer) {
+ Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String>
footer, boolean enableInlineReading) {
super(content, inputStream, readBlockLazily,
Option.of(new HoodieLogBlockContentLocation(logFile, position,
blockSize, blockEndpos)), readerSchema, header,
footer);
+ this.enableInlineReading = enableInlineReading;
}
public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull
Map<HeaderMetadataType, String> header) {
@@ -142,6 +149,50 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
@Override
+ protected void createRecordsFromContentBytes() throws IOException {
+ if (enableInlineReading) {
+ getRecords(Collections.emptyList());
+ } else {
+ super.createRecordsFromContentBytes();
+ }
+ }
+
+ @Override
+ public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+ readWithInlineFS(keys);
+ return records;
+ }
+
+ private void readWithInlineFS(List<String> keys) throws IOException {
+ boolean enableFullScan = keys.isEmpty();
+ // Get schema from the header
+ Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+ // If readerSchema was not present, use writerSchema
+ if (schema == null) {
+ schema = writerSchema;
+ }
+ Configuration conf = new Configuration();
+ CacheConfig cacheConf = new CacheConfig(conf);
+ Configuration inlineConf = new Configuration();
+ inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl",
InLineFileSystem.class.getName());
+
+ Path inlinePath = InLineFSUtils.getInlineFilePath(
+ getBlockContentLocation().get().getLogFile().getPath(),
+
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
+ getBlockContentLocation().get().getContentPositionInLogFile(),
+ getBlockContentLocation().get().getBlockSize());
+ if (!enableFullScan) {
+ // HFile read will be efficient if keys are sorted, since on storage,
records are sorted by key. This will avoid unnecessary seeks.
+ Collections.sort(keys);
+ }
+ HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath,
cacheConf, inlinePath.getFileSystem(inlineConf));
+ List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
+ reader.readRecords(keys, schema);
+ reader.close();
+ this.records = logRecords.stream().map(t ->
t.getSecond()).collect(Collectors.toList());
+ }
+
+ @Override
protected void deserializeRecords() throws IOException {
// Get schema from the header
Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
index b954e57..7b80d1a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
@@ -33,6 +34,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
@@ -55,6 +57,7 @@ public class HoodieHFileReader<R extends IndexedRecord>
implements HoodieFileRea
private Path path;
private Configuration conf;
private HFile.Reader reader;
+ private FSDataInputStream fsDataInputStream;
private Schema schema;
// Scanner used to read individual keys. This is cached to prevent the
overhead of opening the scanner for each
// key retrieval.
@@ -72,6 +75,13 @@ public class HoodieHFileReader<R extends IndexedRecord>
implements HoodieFileRea
this.reader = HFile.createReader(FSUtils.getFs(path.toString(),
configuration), path, cacheConfig, conf);
}
+ public HoodieHFileReader(Configuration configuration, Path path, CacheConfig
cacheConfig, FileSystem inlineFs) throws IOException {
+ this.conf = configuration;
+ this.path = path;
+ this.fsDataInputStream = inlineFs.open(path);
+ this.reader = HFile.createReader(inlineFs, path, cacheConfig,
configuration);
+ }
+
public HoodieHFileReader(byte[] content) throws IOException {
Configuration conf = new Configuration();
Path path = new Path("hoodie");
@@ -164,6 +174,25 @@ public class HoodieHFileReader<R extends IndexedRecord>
implements HoodieFileRea
return readAllRecords(schema, schema);
}
+ public List<Pair<String, R>> readRecords(List<String> keys) throws
IOException {
+ reader.loadFileInfo();
+ Schema schema = new Schema.Parser().parse(new
String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes())));
+ return readRecords(keys, schema);
+ }
+
+ public List<Pair<String, R>> readRecords(List<String> keys, Schema schema)
throws IOException {
+ this.schema = schema;
+ reader.loadFileInfo();
+ List<Pair<String, R>> records = new ArrayList<>();
+ for (String key: keys) {
+ Option<R> value = getRecordByKey(key, schema);
+ if (value.isPresent()) {
+ records.add(new Pair(key, value.get()));
+ }
+ }
+ return records;
+ }
+
@Override
public Iterator getRecordIterator(Schema readerSchema) throws IOException {
final HFileScanner scanner = reader.getScanner(false, false);
@@ -217,7 +246,7 @@ public class HoodieHFileReader<R extends IndexedRecord>
implements HoodieFileRea
synchronized (this) {
if (keyScanner == null) {
- keyScanner = reader.getScanner(true, true);
+ keyScanner = reader.getScanner(false, true);
}
if (keyScanner.seekTo(kv) == 0) {
@@ -250,6 +279,9 @@ public class HoodieHFileReader<R extends IndexedRecord>
implements HoodieFileRea
try {
reader.close();
reader = null;
+ if (fsDataInputStream != null) {
+ fsDataInputStream.close();
+ }
keyScanner = null;
} catch (IOException e) {
throw new HoodieIOException("Error closing the hfile reader", e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1690c9a..b560b76 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
@@ -38,10 +39,13 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -126,15 +130,12 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
}
@Override
- public Map<String, FileStatus[]> getAllFilesInPartitions(List<String>
partitionPaths)
+ public Map<String, FileStatus[]> getAllFilesInPartitions(List<String>
partitions)
throws IOException {
if (enabled) {
- Map<String, FileStatus[]> partitionsFilesMap = new HashMap<>();
-
try {
- for (String partitionPath : partitionPaths) {
- partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new
Path(partitionPath)));
- }
+ List<Path> partitionPaths = partitions.stream().map(entry -> new
Path(entry)).collect(Collectors.toList());
+ Map<String, FileStatus[]> partitionsFilesMap =
fetchAllFilesInPartitionPaths(partitionPaths);
return partitionsFilesMap;
} catch (Exception e) {
throw new HoodieMetadataException("Failed to retrieve files in
partition from metadata", e);
@@ -142,7 +143,7 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf,
dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
- .getAllFilesInPartitions(partitionPaths);
+ .getAllFilesInPartitions(partitions);
}
/**
@@ -150,7 +151,7 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
*/
protected List<String> fetchAllPartitionPaths() throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getRecordByKeyFromMetadata(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
+ Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getRecordByKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
List<String> partitions = Collections.emptyList();
@@ -184,7 +185,7 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
}
HoodieTimer timer = new HoodieTimer().startTimer();
- Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getRecordByKeyFromMetadata(partitionName,
MetadataPartitionType.FILES.partitionPath());
+ Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord =
getRecordByKey(partitionName, MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
FileStatus[] statuses = {};
@@ -200,7 +201,48 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
return statuses;
}
- protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key, String partitionName);
+ Map<String, FileStatus[]> fetchAllFilesInPartitionPaths(List<Path>
partitionPaths) throws IOException {
+ Map<String, Path> partitionInfo = new HashMap<>();
+ boolean foundNonPartitionedPath = false;
+ for (Path partitionPath: partitionPaths) {
+ String partitionName = FSUtils.getRelativePartitionPath(new
Path(dataBasePath), partitionPath);
+ if (partitionName.isEmpty()) {
+ if (partitionInfo.size() > 1) {
+ throw new HoodieMetadataException("Found mix of partitioned and non
partitioned paths while fetching data from metadata table");
+ }
+ partitionInfo.put(NON_PARTITIONED_NAME, partitionPath);
+ foundNonPartitionedPath = true;
+ } else {
+ if (foundNonPartitionedPath) {
+ throw new HoodieMetadataException("Found mix of partitioned and non
partitioned paths while fetching data from metadata table");
+ }
+ partitionInfo.put(partitionName, partitionPath);
+ }
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
partitionsFileStatus =
+ getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()),
MetadataPartitionType.FILES.partitionPath());
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
+ Map<String, FileStatus[]> result = new HashMap<>();
+
+ for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry:
partitionsFileStatus) {
+ if (entry.getValue().isPresent()) {
+ if (!entry.getValue().get().getData().getDeletions().isEmpty()) {
+ throw new HoodieMetadataException("Metadata record for partition " +
entry.getKey() + " is inconsistent: "
+ + entry.getValue().get().getData());
+ }
+ result.put(partitionInfo.get(entry.getKey()).toString(),
entry.getValue().get().getData().getFileStatuses(hadoopConf.get(),
partitionInfo.get(entry.getKey())));
+ }
+ }
+
+ LOG.info("Listed files in partitions from metadata: partition list =" +
Arrays.toString(partitionPaths.toArray()));
+ return result;
+ }
+
+ protected abstract Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKey(String key, String partitionName);
+
+ protected abstract List<Pair<String,
Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String>
key, String partitionName);
protected HoodieEngineContext getEngineContext() {
return engineContext != null ? engineContext : new
HoodieLocalEngineContext(hadoopConf.get());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index b0940a7..bf0cf92 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -81,7 +81,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
private final boolean reuse;
// Readers for latest file slice corresponding to file groups in the
metadata partition of interest
- private Map<String, Pair<HoodieFileReader,
HoodieMetadataMergedLogRecordScanner>> partitionReaders = new
ConcurrentHashMap<>();
+ private Map<String, Pair<HoodieFileReader,
HoodieMetadataMergedLogRecordReader>> partitionReaders = new
ConcurrentHashMap<>();
public HoodieBackedTableMetadata(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
String datasetBasePath, String
spillableMapDirectory) {
@@ -120,18 +120,68 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
@Override
- protected Option<HoodieRecord<HoodieMetadataPayload>>
getRecordByKeyFromMetadata(String key, String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers =
openReadersIfNeeded(key, partitionName);
+ protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
+ return getRecordsByKeys(Collections.singletonList(key),
partitionName).get(0).getValue();
+ }
+
+ protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys, String partitionName) {
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
openReadersIfNeeded(keys.get(0), partitionName);
try {
List<Long> timings = new ArrayList<>();
- HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader baseFileReader = readers.getKey();
- HoodieMetadataMergedLogRecordScanner logRecordScanner =
readers.getRight();
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
readers.getRight();
+
+ // local map to assist in merging with base file records
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readLogRecords(logRecordScanner, keys, timings);
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
readFromBaseAndMergeWithLogRecords(baseFileReader,
+ keys, logRecords, timings);
+ LOG.info(String.format("Metadata read for %s keys took [baseFileRead,
logMerge] %s ms", keys.size(), timings));
+ return result;
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ close(partitionName);
+ }
+ }
+ }
+
+ private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>>
readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+
List<String> keys, List<Long> timings) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new
HashMap<>();
+ // Retrieve records from log file
+ timer.startTimer();
+ if (logRecordScanner != null) {
+ if (metadataConfig.enableFullScan()) {
+ // path which does full scan of log files
+ for (String key : keys) {
+ logRecords.put(key,
logRecordScanner.getRecordByKey(key).get(0).getValue());
+ }
+ } else {
+ // this path will do seeks pertaining to the keys passed in
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
logRecordsList = logRecordScanner.getRecordsByKeys(keys);
+ for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry :
logRecordsList) {
+ logRecords.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ timings.add(timer.endTimer());
+ return logRecords;
+ }
- // Retrieve record from base file
- HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
- if (baseFileReader != null) {
- HoodieTimer readTimer = new HoodieTimer().startTimer();
+ private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
+
List<String> keys, Map<String,
Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
+
List<Long> timings) throws IOException {
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result =
new ArrayList<>();
+ // merge with base records
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ timer.startTimer();
+ HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+ // Retrieve record from base file
+ if (baseFileReader != null) {
+ HoodieTimer readTimer = new HoodieTimer().startTimer();
+ for (String key : keys) {
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = metadataTableConfig.populateMetaFields()
@@ -139,46 +189,45 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
:
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(),
metadataTableConfig.getPartitionFieldProp()), false);
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
- }
- }
- timings.add(timer.endTimer());
-
- // Retrieve record from log file
- timer.startTimer();
- if (logRecordScanner != null) {
- Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord =
logRecordScanner.getRecordByKey(key);
- if (logHoodieRecord.isPresent()) {
- if (hoodieRecord != null) {
- // Merge the payloads
- HoodieRecordPayload mergedPayload =
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
- hoodieRecord = new HoodieRecord(hoodieRecord.getKey(),
mergedPayload);
+ // merge base file record w/ log record if present
+ if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
+ HoodieRecordPayload mergedPayload =
logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
+ result.add(Pair.of(key, Option.of(new
HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
} else {
- hoodieRecord = logHoodieRecord.get();
+ // only base record
+ result.add(Pair.of(key, Option.of(hoodieRecord)));
+ }
+ } else {
+ // only log record
+ if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
+ HoodieRecordPayload mergedPayload =
logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
+ result.add(Pair.of(key, Option.of(new
HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
+ } else { // not found in both base file and log files
+ result.add(Pair.of(key, Option.empty()));
}
}
}
timings.add(timer.endTimer());
- LOG.info(String.format("Metadata read for key %s took [baseFileRead,
logMerge] %s ms", key, timings));
- return Option.ofNullable(hoodieRecord);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for key :" + key, ioe);
- } finally {
- if (!reuse) {
- close(partitionName);
+ } else {
+ // no base file at all
+ timings.add(timer.endTimer());
+ for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>>
entry : logRecords.entrySet()) {
+ result.add(Pair.of(entry.getKey(), entry.getValue()));
}
}
+ return result;
}
/**
* Returns a new pair of readers to the base and log files.
*/
- private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>
openReadersIfNeeded(String key, String partitionName) {
+ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader>
openReadersIfNeeded(String key, String partitionName) {
return partitionReaders.computeIfAbsent(partitionName, k -> {
try {
final long baseFileOpenMs;
final long logScannerOpenMs;
HoodieFileReader baseFileReader = null;
- HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+ HoodieMetadataMergedLogRecordReader logRecordScanner = null;
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
@@ -192,7 +241,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest
file slice
- Pair<HoodieMetadataMergedLogRecordScanner, Long>
logRecordScannerOpenTimePair = getLogRecordScanner(slice);
+ Pair<HoodieMetadataMergedLogRecordReader, Long>
logRecordScannerOpenTimePair = getLogRecordScanner(slice);
logRecordScanner = logRecordScannerOpenTimePair.getKey();
logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
@@ -244,7 +293,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return validInstantTimestamps;
}
- private Pair<HoodieMetadataMergedLogRecordScanner, Long>
getLogRecordScanner(FileSlice slice) {
+ private Pair<HoodieMetadataMergedLogRecordReader, Long>
getLogRecordScanner(FileSlice slice) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> logFilePaths = slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -261,7 +310,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
// Load the schema
Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig =
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
- HoodieMetadataMergedLogRecordScanner logRecordScanner =
HoodieMetadataMergedLogRecordScanner.newBuilder()
+ HoodieMetadataMergedLogRecordReader logRecordScanner =
HoodieMetadataMergedLogRecordReader.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataBasePath)
.withLogFilePaths(logFilePaths)
@@ -273,6 +322,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
+ .enableFullScan(metadataConfig.enableFullScan())
.build();
Long logScannerOpenMs = timer.endTimer();
@@ -319,7 +369,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
private synchronized void close(String partitionName) {
- Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers =
partitionReaders.remove(partitionName);
+ Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers =
partitionReaders.remove(partitionName);
if (readers != null) {
try {
if (readers.getKey() != null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
similarity index 76%
rename from
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
rename to
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index 3132ea6..131ca3b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -19,12 +19,16 @@
package org.apache.hudi.metadata;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,26 +36,30 @@ import
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
/**
* A {@code HoodieMergedLogRecordScanner} implementation which only merged
records matching providing keys. This is
* useful in limiting memory usage when only a small subset of updates records
are to be read.
*/
-public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordScanner {
+public class HoodieMetadataMergedLogRecordReader extends
HoodieMergedLogRecordScanner {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
// Set of all record keys that are to be read in memory
private Set<String> mergeKeyFilter;
- private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath,
List<String> logFilePaths,
+ private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths,
Schema readerSchema, String
latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath,
Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType
diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
- Option<InstantRange>
instantRange) {
+ Option<InstantRange>
instantRange, boolean enableFullScan) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
maxMemorySizeInBytes, false, false, bufferSize,
- spillableMapBasePath, instantRange, false, diskMapType,
isBitCaskDiskMapCompressionEnabled, false);
+ spillableMapBasePath, instantRange, false, diskMapType,
isBitCaskDiskMapCompressionEnabled, false, enableFullScan);
this.mergeKeyFilter = mergeKeyFilter;
-
- performScan();
+ if (enableFullScan) {
+ performScan();
+ }
}
@Override
@@ -71,8 +79,8 @@ public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordS
/**
* Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
*/
- public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() {
- return new HoodieMetadataMergedLogRecordScanner.Builder();
+ public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() {
+ return new HoodieMetadataMergedLogRecordReader.Builder();
}
/**
@@ -81,8 +89,22 @@ public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordS
* @param key Key of the record to retrieve
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
- public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key) {
- return Option.ofNullable((HoodieRecord) records.get(key));
+ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordByKey(String key) {
+ return Collections.singletonList(Pair.of(key,
Option.ofNullable((HoodieRecord) records.get(key))));
+ }
+
+ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
getRecordsByKeys(List<String> keys) {
+ records.clear();
+ scan(Option.of(keys));
+ List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>>
metadataRecords = new ArrayList<>();
+ keys.forEach(entry -> {
+ if (records.containsKey(entry)) {
+ metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord)
records.get(entry))));
+ } else {
+ metadataRecords.add(Pair.of(entry, Option.empty()));
+ }
+ });
+ return metadataRecords;
}
/**
@@ -90,6 +112,8 @@ public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordS
*/
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
private Set<String> mergeKeyFilter = Collections.emptySet();
+ private boolean enableFullScan;
+ private boolean enableInlineReading;
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -171,11 +195,16 @@ public class HoodieMetadataMergedLogRecordScanner extends
HoodieMergedLogRecordS
return this;
}
+ public Builder enableFullScan(boolean enableFullScan) {
+ this.enableFullScan = enableFullScan;
+ return this;
+ }
+
@Override
- public HoodieMetadataMergedLogRecordScanner build() {
- return new HoodieMetadataMergedLogRecordScanner(fs, basePath,
logFilePaths, readerSchema,
+ public HoodieMetadataMergedLogRecordReader build() {
+ return new HoodieMetadataMergedLogRecordReader(fs, basePath,
logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize,
spillableMapBasePath, mergeKeyFilter,
- diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange);
+ diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange,
enableFullScan);
}
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index a647da9..1771db0 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -137,6 +138,24 @@ public class TestHoodieRealtimeRecordReader {
public void testReader(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned) throws Exception {
+ testReaderInternal(diskMapType, isCompressionEnabled, partitioned);
+ }
+
+ @Test
+ public void testHFileInlineReader() throws Exception {
+ testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false,
+ HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK);
+ }
+
+ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isCompressionEnabled,
+ boolean partitioned) throws Exception {
+ testReaderInternal(diskMapType, isCompressionEnabled, partitioned,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+ }
+
+ private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
+ boolean isCompressionEnabled,
+ boolean partitioned,
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
// initial commit
Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(),
HoodieTableType.MERGE_ON_READ);
@@ -175,7 +194,7 @@ public class TestHoodieRealtimeRecordReader {
} else {
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs,
schema, "fileid0", baseInstant,
- instantTime, 120, 0, logVersion);
+ instantTime, 120, 0, logVersion, logBlockType);
}
long size = writer.getCurrentSize();
writer.close();
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index d10ccfc..13d9219 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -301,7 +303,14 @@ public class InputFormatTestUtil {
public static HoodieLogFormat.Writer writeDataBlockToLogFile(File
partitionDir, FileSystem fs, Schema schema, String
fileId,
- String
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
+ String
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
throws IOException, InterruptedException {
+ return writeDataBlockToLogFile(partitionDir, fs, schema, fileId,
baseCommit, newCommit, numberOfRecords, offset, logVersion,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+ }
+
+ public static HoodieLogFormat.Writer writeDataBlockToLogFile(File
partitionDir, FileSystem fs, Schema schema, String
+ fileId,
+ String
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion,
+
HoodieLogBlock.HoodieLogBlockType logBlockType)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(new
Path(partitionDir.getPath()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion)
@@ -314,7 +323,8 @@ public class InputFormatTestUtil {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
writeSchema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+ HoodieDataBlock dataBlock = (logBlockType ==
HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new
HoodieHFileDataBlock(records, header) :
+ new HoodieAvroDataBlock(records, header);
writer.appendBlock(dataBlock);
return writer;
}