This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 318486662b7 [HUDI-9012] Implement and utilize native HFile writer
(#12866)
318486662b7 is described below
commit 318486662b750bf24f0211fa37346612505d28e7
Author: Lin Liu <[email protected]>
AuthorDate: Sun May 25 11:23:45 2025 -0700
[HUDI-9012] Implement and utilize native HFile writer (#12866)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../bootstrap/index/hfile/HFileBootstrapIndex.java | 7 +-
.../index/hfile/HFileBootstrapIndexWriter.java | 59 ++---
.../org/apache/hudi/common/util/HFileUtils.java | 73 +++---
.../apache/hudi/common/util/TestHFileUtils.java | 11 +-
.../io/hadoop/HoodieAvroFileWriterFactory.java | 15 +-
.../hudi/io/hadoop/HoodieAvroHFileWriter.java | 82 +++----
.../apache/hudi/io/hadoop/HoodieHFileConfig.java | 66 ++----
.../hadoop/TestHoodieHBaseHFileReaderWriter.java | 3 +-
.../io/hadoop/TestHoodieHFileReaderWriterBase.java | 6 +-
.../apache/hudi/io/compress/CompressionCodec.java | 79 ++++++-
...odieDecompressor.java => HoodieCompressor.java} | 23 +-
...orFactory.java => HoodieCompressorFactory.java} | 19 +-
...essor.java => HoodieAirliftGzipCompressor.java} | 27 ++-
...Decompressor.java => HoodieNoneCompressor.java} | 17 +-
.../org/apache/hudi/io/hfile/ChecksumType.java | 91 +++++++
.../java/org/apache/hudi/io/hfile/HFileBlock.java | 123 +++++++++-
.../org/apache/hudi/io/hfile/HFileBlockType.java | 4 +
.../org/apache/hudi/io/hfile/HFileContext.java | 39 ++-
.../org/apache/hudi/io/hfile/HFileDataBlock.java | 78 +++++-
.../apache/hudi/io/hfile/HFileFileInfoBlock.java | 45 ++++
.../org/apache/hudi/io/hfile/HFileIndexBlock.java | 55 +++++
.../java/org/apache/hudi/io/hfile/HFileInfo.java | 4 +-
.../apache/hudi/io/hfile/HFileLeafIndexBlock.java | 7 +
.../org/apache/hudi/io/hfile/HFileMetaBlock.java | 28 +++
.../apache/hudi/io/hfile/HFileMetaIndexBlock.java | 59 +++++
.../org/apache/hudi/io/hfile/HFileReaderImpl.java | 8 +
.../apache/hudi/io/hfile/HFileRootIndexBlock.java | 47 +++-
.../org/apache/hudi/io/hfile/HFileTrailer.java | 3 +-
.../java/org/apache/hudi/io/hfile/HFileUtils.java | 40 ----
.../{HFileMetaBlock.java => HFileWriter.java} | 32 ++-
.../org/apache/hudi/io/hfile/HFileWriterImpl.java | 233 ++++++++++++++++++
.../{HFileMetaBlock.java => KeyValueEntry.java} | 23 +-
.../org/apache/hudi/io/hfile/UTF8StringKey.java | 1 -
...Decompressor.java => TestHoodieCompressor.java} | 22 +-
.../org/apache/hudi/io/hfile/TestHFileWriter.java | 263 +++++++++++++++++++++
35 files changed, 1367 insertions(+), 325 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
index 9901aa1de7b..fd342288cf0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndex.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
@@ -144,9 +143,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
@Override
public BootstrapIndex.IndexWriter createWriter(String bootstrapBasePath) {
- return (IndexWriter)
ReflectionUtils.loadClass("org.apache.hudi.common.bootstrap.index.hfile.HBaseHFileBootstrapIndexWriter",
- new Class<?>[] {String.class, HoodieTableMetaClient.class},
- bootstrapBasePath, metaClient);
+ return new HFileBootstrapIndexWriter(bootstrapBasePath, metaClient);
}
@Override
@@ -155,7 +152,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
StoragePath[] indexPaths = new StoragePath[]
{partitionIndexPath(metaClient), fileIdIndexPath(metaClient)};
for (StoragePath indexPath : indexPaths) {
if (metaClient.getStorage().exists(indexPath)) {
- LOG.info("Dropping bootstrap index. Deleting file : " + indexPath);
+ LOG.info("Dropping bootstrap index. Deleting file: {}", indexPath);
metaClient.getStorage().deleteDirectory(indexPath);
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
similarity index 76%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
index 9ffacdc6112..bcd063ff5d0 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HBaseHFileBootstrapIndexWriter.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/hfile/HFileBootstrapIndexWriter.java
@@ -30,21 +30,16 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -52,21 +47,20 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY;
+import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.INDEX_INFO_KEY_STRING;
import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.fileIdIndexPath;
import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getFileGroupKey;
import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.getPartitionKey;
import static
org.apache.hudi.common.bootstrap.index.hfile.HFileBootstrapIndex.partitionIndexPath;
-import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-public class HBaseHFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter
{
- private static final Logger LOG =
LoggerFactory.getLogger(HBaseHFileBootstrapIndexWriter.class);
+public class HFileBootstrapIndexWriter extends BootstrapIndex.IndexWriter {
+ private static final Logger LOG =
LoggerFactory.getLogger(HFileBootstrapIndexWriter.class);
private final String bootstrapBasePath;
private final StoragePath indexByPartitionPath;
private final StoragePath indexByFileIdPath;
- private HFile.Writer indexByPartitionWriter;
- private HFile.Writer indexByFileIdWriter;
+ private HFileWriter indexByPartitionWriter;
+ private HFileWriter indexByFileIdWriter;
private boolean closed = false;
private int numPartitionKeysAdded = 0;
@@ -74,7 +68,7 @@ public class HBaseHFileBootstrapIndexWriter extends
BootstrapIndex.IndexWriter {
private final Map<String, List<BootstrapFileMapping>> sourceFileMappings =
new HashMap<>();
- public HBaseHFileBootstrapIndexWriter(String bootstrapBasePath,
HoodieTableMetaClient metaClient) {
+ public HFileBootstrapIndexWriter(String bootstrapBasePath,
HoodieTableMetaClient metaClient) {
super(metaClient);
try {
metaClient.initializeBootstrapDirsIfNotExists();
@@ -114,9 +108,7 @@ public class HBaseHFileBootstrapIndexWriter extends
BootstrapIndex.IndexWriter {
m.getBootstrapFileStatus())).collect(Collectors.toMap(Pair::getKey,
Pair::getValue)));
Option<byte[]> bytes =
TimelineMetadataUtils.serializeAvroMetadata(bootstrapPartitionMetadata,
HoodieBootstrapPartitionMetadata.class);
if (bytes.isPresent()) {
- indexByPartitionWriter
- .append(new KeyValue(getUTF8Bytes(getPartitionKey(partitionPath)),
new byte[0], new byte[0],
- HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put, bytes.get()));
+ indexByPartitionWriter.append(getPartitionKey(partitionPath),
bytes.get());
numPartitionKeysAdded++;
}
} catch (IOException e) {
@@ -135,11 +127,10 @@ public class HBaseHFileBootstrapIndexWriter extends
BootstrapIndex.IndexWriter {
srcFilePartitionInfo.setPartitionPath(mapping.getPartitionPath());
srcFilePartitionInfo.setBootstrapPartitionPath(mapping.getBootstrapPartitionPath());
srcFilePartitionInfo.setBootstrapFileStatus(mapping.getBootstrapFileStatus());
- KeyValue kv = new
KeyValue(getUTF8Bytes(getFileGroupKey(mapping.getFileGroupId())), new byte[0],
new byte[0],
- HConstants.LATEST_TIMESTAMP, KeyValue.Type.Put,
- TimelineMetadataUtils.serializeAvroMetadata(srcFilePartitionInfo,
- HoodieBootstrapFilePartitionInfo.class).get());
- indexByFileIdWriter.append(kv);
+ indexByFileIdWriter.append(
+ getFileGroupKey(mapping.getFileGroupId()),
+ TimelineMetadataUtils.serializeAvroMetadata(
+ srcFilePartitionInfo,
HoodieBootstrapFilePartitionInfo.class).get());
numFileIdKeysAdded++;
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
@@ -166,9 +157,11 @@ public class HBaseHFileBootstrapIndexWriter extends
BootstrapIndex.IndexWriter {
.build();
LOG.info("Appending FileId FileInfo :" + fileIdIndexInfo);
- indexByPartitionWriter.appendFileInfo(INDEX_INFO_KEY,
+ indexByPartitionWriter.appendFileInfo(
+ INDEX_INFO_KEY_STRING,
TimelineMetadataUtils.serializeAvroMetadata(partitionIndexInfo,
HoodieBootstrapIndexInfo.class).get());
- indexByFileIdWriter.appendFileInfo(INDEX_INFO_KEY,
+ indexByFileIdWriter.appendFileInfo(
+ INDEX_INFO_KEY_STRING,
TimelineMetadataUtils.serializeAvroMetadata(fileIdIndexInfo,
HoodieBootstrapIndexInfo.class).get());
close();
@@ -196,15 +189,11 @@ public class HBaseHFileBootstrapIndexWriter extends
BootstrapIndex.IndexWriter {
@Override
public void begin() {
try {
- HFileContext meta = new HFileContextBuilder().withCellComparator(new
org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex.HoodieKVComparator()).build();
- this.indexByPartitionWriter =
HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class),
- new
CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class)))
- .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new
Path(indexByPartitionPath.toUri()))
- .withFileContext(meta).create();
- this.indexByFileIdWriter =
HFile.getWriterFactory(metaClient.getStorageConf().unwrapAs(Configuration.class),
- new
CacheConfig(metaClient.getStorageConf().unwrapAs(Configuration.class)))
- .withPath((FileSystem) metaClient.getStorage().getFileSystem(), new
Path(indexByFileIdPath.toUri()))
- .withFileContext(meta).create();
+ HFileContext context = HFileContext.builder().build();
+ OutputStream outputStreamForPartitionWriter =
metaClient.getStorage().create(indexByPartitionPath);
+ this.indexByPartitionWriter = new HFileWriterImpl(context,
outputStreamForPartitionWriter);
+ OutputStream outputStreamForFileIdWriter =
metaClient.getStorage().create(indexByFileIdPath);
+ this.indexByFileIdWriter = new HFileWriterImpl(context,
outputStreamForFileIdWriter);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
similarity index 82%
rename from
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
rename to hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 001a4dd3474..6238fbcb920 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -29,9 +29,11 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
@@ -39,19 +41,13 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -70,17 +66,15 @@ public class HFileUtils extends FileFormatUtils {
private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024;
/**
- * Gets the {@link Compression.Algorithm} Enum based on the {@link
CompressionCodec} name.
- *
* @param paramsMap parameter map containing the compression codec config.
- * @return the {@link Compression.Algorithm} Enum.
+ * @return the {@link CompressionCodec} Enum.
*/
- public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String,
String> paramsMap) {
- String algoName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
- if (StringUtils.isNullOrEmpty(algoName)) {
- return Compression.Algorithm.GZ;
+ public static CompressionCodec getHFileCompressionAlgorithm(Map<String,
String> paramsMap) {
+ String codecName = paramsMap.get(HFILE_COMPRESSION_ALGORITHM_NAME.key());
+ if (StringUtils.isNullOrEmpty(codecName)) {
+ return CompressionCodec.GZIP;
}
- return Compression.Algorithm.valueOf(algoName.toUpperCase());
+ return CompressionCodec.findCodecByName(codecName);
}
@Override
@@ -167,17 +161,9 @@ public class HFileUtils extends FileFormatUtils {
Schema readerSchema,
String keyFieldName,
Map<String, String> paramsMap)
throws IOException {
- Compression.Algorithm compressionAlgorithm =
getHFileCompressionAlgorithm(paramsMap);
- HFileContext context = new HFileContextBuilder()
- .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
- .withCompression(compressionAlgorithm)
- .withCellComparator(new HoodieHBaseKVComparator())
- .build();
-
- Configuration conf = storage.getConf().unwrapAs(Configuration.class);
- CacheConfig cacheConfig = new CacheConfig(conf);
+ CompressionCodec compressionCodec =
getHFileCompressionAlgorithm(paramsMap);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+ OutputStream ostream = new DataOutputStream(baos);
// Use simple incrementing counter as a key
boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
@@ -209,26 +195,25 @@ public class HFileUtils extends FileFormatUtils {
sortedRecordsMap.put(recordKey, recordBytes);
}
- HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
- .withOutputStream(ostream).withFileContext(context).create();
-
- // Write the records
- sortedRecordsMap.forEach((recordKey, recordBytes) -> {
- try {
- KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
- writer.append(kv);
- } catch (IOException e) {
- throw new HoodieIOException("IOException serializing records", e);
- }
- });
-
- writer.appendFileInfo(
- getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(readerSchema.toString()));
+ HFileContext context = HFileContext.builder()
+ .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .compressionCodec(compressionCodec)
+ .build();
+ try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
+ sortedRecordsMap.forEach((recordKey,recordBytes) -> {
+ try {
+ writer.append(recordKey, recordBytes);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ });
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
+ getUTF8Bytes(readerSchema.toString()));
+ }
- writer.close();
ostream.flush();
ostream.close();
-
return baos;
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
similarity index 84%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
rename to
hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
index c88dced4ab3..fbf9b196f42 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -19,7 +19,8 @@
package org.apache.hudi.common.util;
-import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hudi.io.compress.CompressionCodec;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -36,8 +37,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class TestHFileUtils {
@ParameterizedTest
- @EnumSource(Compression.Algorithm.class)
- public void testGetHFileCompressionAlgorithm(Compression.Algorithm algo) {
+ @EnumSource(CompressionCodec.class)
+ public void testGetHFileCompressionAlgorithm(CompressionCodec algo) {
for (boolean upperCase : new boolean[] {true, false}) {
Map<String, String> paramsMap = Collections.singletonMap(
HFILE_COMPRESSION_ALGORITHM_NAME.key(),
@@ -48,12 +49,12 @@ public class TestHFileUtils {
@Test
public void testGetHFileCompressionAlgorithmWithEmptyString() {
- assertEquals(Compression.Algorithm.GZ, getHFileCompressionAlgorithm(
+ assertEquals(CompressionCodec.GZIP, getHFileCompressionAlgorithm(
Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), "")));
}
@Test
public void testGetDefaultHFileCompressionAlgorithm() {
- assertEquals(Compression.Algorithm.GZ,
getHFileCompressionAlgorithm(Collections.emptyMap()));
+ assertEquals(CompressionCodec.GZIP,
getHFileCompressionAlgorithm(Collections.emptyMap()));
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
index aaf583afefb..eac3f07681e 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.io.compress.CompressionCodec;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
@@ -38,7 +39,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.orc.CompressionKind;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -48,11 +48,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Properties;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1;
-import static
org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN;
-
public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
public HoodieAvroFileWriterFactory(HoodieStorage storage) {
@@ -98,13 +93,13 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
String instantTime, StoragePath path, HoodieConfig config, Schema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
BloomFilter filter = createBloomFilter(config);
- HoodieHFileConfig hfileConfig = new
HoodieHFileConfig(storage.getConf().unwrapAs(Configuration.class),
- Compression.Algorithm.valueOf(
+ HoodieHFileConfig hfileConfig = new HoodieHFileConfig(
+ storage.getConf(),
+ CompressionCodec.findCodecByName(
config.getString(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME)),
config.getInt(HoodieStorageConfig.HFILE_BLOCK_SIZE),
config.getLong(HoodieStorageConfig.HFILE_MAX_FILE_SIZE),
- HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME,
- PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);
+ HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, filter);
return new HoodieAvroHFileWriter(instantTime, path, hfileConfig, schema,
taskContextSupplier, config.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS));
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index b81f57beb69..59ac7f3c352 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -27,9 +27,15 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.hfile.HFileContext;
+import org.apache.hudi.io.hfile.HFileWriter;
+import org.apache.hudi.io.hfile.HFileWriterImpl;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -37,17 +43,9 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -73,18 +71,14 @@ public class HoodieAvroHFileWriter
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final Option<Schema.Field> keyFieldSchema;
- private HFile.Writer writer;
+ private HFileWriter writer;
private String minRecordKey;
private String maxRecordKey;
private String prevRecordKey;
- // This is private in CacheConfig so have been copied here.
- private static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";
-
public HoodieAvroHFileWriter(String instantTime, StoragePath file,
HoodieHFileConfig hfileConfig, Schema schema,
TaskContextSupplier taskContextSupplier,
boolean populateMetaFields) throws IOException {
-
- Configuration conf = HadoopFSUtils.registerFileSystem(file,
hfileConfig.getHadoopConf());
+ Configuration conf = HadoopFSUtils.registerFileSystem(file,
(Configuration) hfileConfig.getStorageConf().unwrap());
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
FileSystem fs = this.file.getFileSystem(conf);
this.isWrapperFileSystem = fs instanceof HoodieWrapperFileSystem;
@@ -101,25 +95,18 @@ public class HoodieAvroHFileWriter
this.taskContextSupplier = taskContextSupplier;
this.populateMetaFields = populateMetaFields;
- HFileContext context = new
HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
- .withCompression(hfileConfig.getCompressionAlgorithm())
- .withCellComparator(hfileConfig.getHFileComparator())
+ HFileContext context = new HFileContext.Builder()
+ .blockSize(hfileConfig.getBlockSize())
+ .compressionCodec(hfileConfig.getCompressionCodec())
.build();
+ StorageConfiguration<Configuration> storageConf = new
HadoopStorageConfiguration(conf);
+ StoragePath filePath = new StoragePath(this.file.toUri());
+ OutputStream outputStream = HoodieStorageUtils.getStorage(filePath,
storageConf).create(filePath);
+ this.writer = new HFileWriterImpl(context, outputStream);
- conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY,
- String.valueOf(hfileConfig.shouldPrefetchBlocksOnOpen()));
- conf.set(HColumnDescriptor.CACHE_DATA_IN_L1,
String.valueOf(hfileConfig.shouldCacheDataInL1()));
- conf.set(DROP_BEHIND_CACHE_COMPACTION_KEY,
- String.valueOf(hfileConfig.shouldDropBehindCacheCompaction()));
- CacheConfig cacheConfig = new CacheConfig(conf);
- this.writer = HFile.getWriterFactory(conf, cacheConfig)
- .withPath(fs, this.file)
- .withFileContext(context)
- .create();
-
-
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
- getUTF8Bytes(schema.toString()));
this.prevRecordKey = "";
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
getUTF8Bytes(schema.toString()));
}
@Override
@@ -149,7 +136,8 @@ public class HoodieAvroHFileWriter
if (keyFieldSchema.isPresent()) {
GenericRecord keyExcludedRecord = (GenericRecord) record;
int keyFieldPos = this.keyFieldSchema.get().pos();
- boolean isKeyAvailable = (record.get(keyFieldPos) != null &&
!(record.get(keyFieldPos).toString().isEmpty()));
+ boolean isKeyAvailable = (record.get(keyFieldPos) != null
+ && !(record.get(keyFieldPos).toString().isEmpty()));
if (isKeyAvailable) {
Object originalKey = keyExcludedRecord.get(keyFieldPos);
keyExcludedRecord.put(keyFieldPos, EMPTY_STRING);
@@ -161,9 +149,7 @@ public class HoodieAvroHFileWriter
if (!isRecordSerialized) {
value = HoodieAvroUtils.avroToBytes((GenericRecord) record);
}
-
- KeyValue kv = new KeyValue(getUTF8Bytes(recordKey), null, null, value);
- writer.append(kv);
+ writer.append(recordKey, value);
if (hfileConfig.useBloomFilter()) {
hfileConfig.getBloomFilter().add(recordKey);
@@ -185,25 +171,17 @@ public class HoodieAvroHFileWriter
if (maxRecordKey == null) {
maxRecordKey = "";
}
-
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD),
- getUTF8Bytes(minRecordKey));
-
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD),
- getUTF8Bytes(maxRecordKey));
-
writer.appendFileInfo(getUTF8Bytes(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE),
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD,
getUTF8Bytes(minRecordKey));
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD,
getUTF8Bytes(maxRecordKey));
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
-
writer.appendMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
- new Writable() {
- @Override
- public void write(DataOutput out) throws IOException {
- out.write(getUTF8Bytes(bloomFilter.serializeToString()));
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
- });
+ writer.appendMetaInfo(
+ HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
+ getUTF8Bytes(bloomFilter.serializeToString()));
}
-
writer.close();
writer = null;
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
index 83b659a6be0..37a225f734e 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
@@ -20,55 +20,37 @@
package org.apache.hudi.io.hadoop;
import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.storage.StorageConfiguration;
public class HoodieHFileConfig {
-
- public static final CellComparator HFILE_COMPARATOR = new
HoodieHBaseKVComparator();
- public static final boolean PREFETCH_ON_OPEN =
CacheConfig.DEFAULT_PREFETCH_ON_OPEN;
- public static final boolean CACHE_DATA_IN_L1 =
HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1;
- // This is private in CacheConfig so have been copied here.
- public static final boolean DROP_BEHIND_CACHE_COMPACTION = true;
-
- private final Compression.Algorithm compressionAlgorithm;
+ private final CompressionCodec compressionCodec;
private final int blockSize;
private final long maxFileSize;
- private final boolean prefetchBlocksOnOpen;
- private final boolean cacheDataInL1;
- private final boolean dropBehindCacheCompaction;
- private final Configuration hadoopConf;
+ private final StorageConfiguration storageConf;
private final BloomFilter bloomFilter;
- private final CellComparator hfileComparator;
private final String keyFieldName;
- public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm
compressionAlgorithm, int blockSize,
- long maxFileSize, String keyFieldName,
- boolean prefetchBlocksOnOpen, boolean
cacheDataInL1, boolean dropBehindCacheCompaction,
- BloomFilter bloomFilter, CellComparator
hfileComparator) {
- this.hadoopConf = hadoopConf;
- this.compressionAlgorithm = compressionAlgorithm;
+ public HoodieHFileConfig(StorageConfiguration storageConf,
+ CompressionCodec compressionCodec,
+ int blockSize,
+ long maxFileSize,
+ String keyFieldName,
+ BloomFilter bloomFilter) {
+ this.storageConf = storageConf;
+ this.compressionCodec = compressionCodec;
this.blockSize = blockSize;
this.maxFileSize = maxFileSize;
- this.prefetchBlocksOnOpen = prefetchBlocksOnOpen;
- this.cacheDataInL1 = cacheDataInL1;
- this.dropBehindCacheCompaction = dropBehindCacheCompaction;
this.bloomFilter = bloomFilter;
- this.hfileComparator = hfileComparator;
this.keyFieldName = keyFieldName;
}
- public Configuration getHadoopConf() {
- return hadoopConf;
+ public StorageConfiguration getStorageConf() {
+ return storageConf;
}
- public Compression.Algorithm getCompressionAlgorithm() {
- return compressionAlgorithm;
+ public CompressionCodec getCompressionCodec() {
+ return compressionCodec;
}
public int getBlockSize() {
@@ -79,18 +61,6 @@ public class HoodieHFileConfig {
return maxFileSize;
}
- public boolean shouldPrefetchBlocksOnOpen() {
- return prefetchBlocksOnOpen;
- }
-
- public boolean shouldCacheDataInL1() {
- return cacheDataInL1;
- }
-
- public boolean shouldDropBehindCacheCompaction() {
- return dropBehindCacheCompaction;
- }
-
public boolean useBloomFilter() {
return bloomFilter != null;
}
@@ -99,10 +69,6 @@ public class HoodieHFileConfig {
return bloomFilter;
}
- public CellComparator getHFileComparator() {
- return hfileComparator;
- }
-
public String getKeyFieldName() {
return keyFieldName;
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index e0d307e2f03..ee1138800a6 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -56,7 +56,8 @@ import static
org.apache.hudi.io.hfile.TestHFileReader.VALUE_CREATOR;
import static
org.apache.hudi.io.storage.TestHoodieReaderWriterUtils.writeHFileForTesting;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestHoodieHBaseHFileReaderWriter extends
TestHoodieHFileReaderWriterBase {
+@Disabled("HUDI-9084")
+class TestHoodieHBaseHFileReaderWriter extends TestHoodieHFileReaderWriterBase
{
@Override
protected HoodieAvroFileReader createReader(
HoodieStorage storage) throws Exception {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index 9fce54b82fe..8c319298a24 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -33,6 +33,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -73,7 +74,6 @@ import java.util.stream.StreamSupport;
import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
-import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
import static
org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUFFIX;
import static
org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX;
import static
org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX;
@@ -485,7 +485,7 @@ public abstract class TestHoodieHFileReaderWriterBase
extends TestHoodieReaderWr
FileSystem fs = HadoopFSUtils.getFs(getFilePath().toString(), new
Configuration());
byte[] content = readHFileFromResources(simpleHFile);
verifyHFileReader(
- content, hfilePrefix, true, HFILE_COMPARATOR.getClass(),
NUM_RECORDS_FIXTURE);
+ content, hfilePrefix, true, HoodieHBaseKVComparator.class,
NUM_RECORDS_FIXTURE);
HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
try (HoodieAvroHFileReaderImplBase hfileReader =
createHFileReader(storage, content)) {
@@ -497,7 +497,7 @@ public abstract class TestHoodieHFileReaderWriterBase
extends TestHoodieReaderWr
content = readHFileFromResources(complexHFile);
verifyHFileReader(
- content, hfilePrefix, true, HFILE_COMPARATOR.getClass(),
NUM_RECORDS_FIXTURE);
+ content, hfilePrefix, true, HoodieHBaseKVComparator.class,
NUM_RECORDS_FIXTURE);
try (HoodieAvroHFileReaderImplBase hfileReader =
createHFileReader(storage, content)) {
Schema avroSchema =
getSchemaFromResource(TestHoodieReaderWriterBase.class,
"/exampleSchemaWithUDT.avsc");
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
b/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
index d9c933cdc08..6ddf004b94f 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java
@@ -19,26 +19,89 @@
package org.apache.hudi.io.compress;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
/**
* Available compression codecs.
* There should not be any assumption on the ordering or ordinal of the
defined enums.
*/
public enum CompressionCodec {
- NONE("none"),
- BZIP2("bz2"),
- GZIP("gz"),
- LZ4("lz4"),
- LZO("lzo"),
- SNAPPY("snappy"),
- ZSTD("zstd");
+ NONE("none", 2),
+ BZIP2("bz2", 5),
+ GZIP("gz", 1),
+ LZ4("lz4", 4),
+ LZO("lzo", 0),
+ SNAPPY("snappy", 3),
+ ZSTD("zstd", 6);
+
+ private static final Map<String, CompressionCodec>
+ NAME_TO_COMPRESSION_CODEC_MAP = createNameToCompressionCodecMap();
+ private static final Map<Integer, CompressionCodec>
+ ID_TO_COMPRESSION_CODEC_MAP = createIdToCompressionCodecMap();
private final String name;
+ // CompressionCodec ID to be stored in HFile on storage
+ // The ID of each codec cannot change or else that breaks all existing
HFiles out there
+ // even the ones that are not compressed! (They use the NONE algorithm)
+ private final int id;
- CompressionCodec(final String name) {
+ CompressionCodec(final String name, int id) {
this.name = name;
+ this.id = id;
}
public String getName() {
return name;
}
+
+ public int getId() {
+ return id;
+ }
+
+ public static CompressionCodec findCodecByName(String name) {
+ CompressionCodec codec =
+ NAME_TO_COMPRESSION_CODEC_MAP.get(name.toLowerCase());
+ ValidationUtils.checkArgument(
+ codec != null, String.format("Cannot find compression codec: %s",
name));
+ return codec;
+ }
+
+ /**
+ * Gets the compression codec based on the ID. This ID is written to the
HFile on storage.
+ *
+ * @param id ID indicating the compression codec
+ * @return compression codec based on the ID
+ */
+ public static CompressionCodec decodeCompressionCodec(int id) {
+ CompressionCodec codec = ID_TO_COMPRESSION_CODEC_MAP.get(id);
+ ValidationUtils.checkArgument(
+ codec != null, "Compression code not found for ID: " + id);
+ return codec;
+ }
+
+ /**
+ * @return the mapping of name to compression codec.
+ */
+ private static Map<String, CompressionCodec>
createNameToCompressionCodecMap() {
+ return Collections.unmodifiableMap(
+ Arrays.stream(CompressionCodec.values())
+ .collect(Collectors.toMap(CompressionCodec::getName,
Function.identity()))
+ );
+ }
+
+ /**
+ * @return the mapping of ID to compression codec.
+ */
+ private static Map<Integer, CompressionCodec>
createIdToCompressionCodecMap() {
+ return Collections.unmodifiableMap(
+ Arrays.stream(CompressionCodec.values())
+ .collect(Collectors.toMap(CompressionCodec::getId,
Function.identity()))
+ );
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
similarity index 70%
rename from
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java
rename to
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
index 62be2747003..5ad0f71d12c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressor.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressor.java
@@ -21,11 +21,12 @@ package org.apache.hudi.io.compress;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
/**
- * Provides decompression on input data.
+ * Provides compression and decompression on input data.
*/
-public interface HoodieDecompressor {
+public interface HoodieCompressor {
/**
* Decompresses the data from {@link InputStream} and writes the
decompressed data to the target
* byte array.
@@ -41,4 +42,22 @@ public interface HoodieDecompressor {
byte[] targetByteArray,
int offset,
int length) throws IOException;
+
+ /**
+ * Compresses data stored in byte array.
+ *
+ * @param uncompressedBytes input data in byte array
+ * @return output data in byte array
+ * @throws IOException upon error
+ */
+ byte[] compress(byte[] uncompressedBytes) throws IOException;
+
+ /**
+ * Compresses data stored in {@link ByteBuffer}.
+ *
+ * @param uncompressedBytes input data in {@link ByteBuffer}
+ * @return output data in {@link ByteBuffer}
+ * @throws IOException upon error
+ */
+ ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException;
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
similarity index 65%
rename from
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
rename to
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
index af50b094079..9d56e69d705 100644
---
a/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieDecompressorFactory.java
+++
b/hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java
@@ -19,22 +19,25 @@
package org.apache.hudi.io.compress;
-import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipDecompressor;
-import org.apache.hudi.io.compress.builtin.HoodieNoneDecompressor;
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipCompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneCompressor;
/**
- * Factory for {@link HoodieDecompressor}.
+ * Factory for {@link HoodieCompressor}.
*/
-public class HoodieDecompressorFactory {
- public static HoodieDecompressor getDecompressor(CompressionCodec
compressionCodec) {
+public class HoodieCompressorFactory {
+ private HoodieCompressorFactory() {
+ }
+
+ public static HoodieCompressor getCompressor(CompressionCodec
compressionCodec) {
switch (compressionCodec) {
case NONE:
- return new HoodieNoneDecompressor();
+ return new HoodieNoneCompressor();
case GZIP:
- return new HoodieAirliftGzipDecompressor();
+ return new HoodieAirliftGzipCompressor();
default:
throw new IllegalArgumentException(
- "The decompression is not supported for compression codec: " +
compressionCodec);
+ "The compressor is not supported for compression codec: " +
compressionCodec);
}
}
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
b/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
similarity index 63%
rename from
hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
rename to
hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
index 15c2ff3f827..2216f581477 100644
---
a/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java
+++
b/hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipCompressor.java
@@ -20,24 +20,27 @@
package org.apache.hudi.io.compress.airlift;
import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
+import org.apache.hudi.io.compress.HoodieCompressor;
import io.airlift.compress.gzip.JdkGzipHadoopStreams;
import io.airlift.compress.hadoop.HadoopInputStream;
+import io.airlift.compress.hadoop.HadoopOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import static org.apache.hudi.io.util.IOUtils.readFully;
/**
- * Implementation of {@link HoodieDecompressor} for {@link
CompressionCodec#GZIP} compression
+ * Implementation of {@link HoodieCompressor} for {@link
CompressionCodec#GZIP} compression
* codec using airlift aircompressor's GZIP decompressor.
*/
-public class HoodieAirliftGzipDecompressor implements HoodieDecompressor {
+public class HoodieAirliftGzipCompressor implements HoodieCompressor {
private final JdkGzipHadoopStreams gzipStreams;
- public HoodieAirliftGzipDecompressor() {
+ public HoodieAirliftGzipCompressor() {
gzipStreams = new JdkGzipHadoopStreams();
}
@@ -50,4 +53,20 @@ public class HoodieAirliftGzipDecompressor implements
HoodieDecompressor {
return readFully(stream, targetByteArray, offset, length);
}
}
+
+ @Override
+ public byte[] compress(byte[] data) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try (HadoopOutputStream gzipOutputStream =
gzipStreams.createOutputStream(byteArrayOutputStream)) {
+ gzipOutputStream.write(data);
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException {
+ byte[] temp = new byte[uncompressedBytes.remaining()];
+ uncompressedBytes.get(temp);
+ return ByteBuffer.wrap(this.compress(temp));
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
b/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
similarity index 74%
rename from
hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
rename to
hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
index d702201c6dd..a402d29bb52 100644
---
a/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneDecompressor.java
+++
b/hudi-io/src/main/java/org/apache/hudi/io/compress/builtin/HoodieNoneCompressor.java
@@ -20,18 +20,19 @@
package org.apache.hudi.io.compress.builtin;
import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
+import org.apache.hudi.io.compress.HoodieCompressor;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import static org.apache.hudi.io.util.IOUtils.readFully;
/**
- * Implementation of {@link HoodieDecompressor} for {@link
CompressionCodec#NONE} compression
+ * Implementation of {@link HoodieCompressor} for {@link
CompressionCodec#NONE} compression
* codec (no compression) by directly reading the input stream.
*/
-public class HoodieNoneDecompressor implements HoodieDecompressor {
+public class HoodieNoneCompressor implements HoodieCompressor {
@Override
public int decompress(InputStream compressedInput,
byte[] targetByteArray,
@@ -39,4 +40,14 @@ public class HoodieNoneDecompressor implements
HoodieDecompressor {
int length) throws IOException {
return readFully(compressedInput, targetByteArray, offset, length);
}
+
+ @Override
+ public byte[] compress(byte[] uncompressedBytes) {
+ return uncompressedBytes;
+ }
+
+ @Override
+ public ByteBuffer compress(ByteBuffer uncompressedBytes) throws IOException {
+ return uncompressedBytes;
+ }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java
new file mode 100644
index 00000000000..1642370f161
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/ChecksumType.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+/**
+ * Type of checksum used to validate the integrity of data block.
+ * It determines the number of bytes used for checksum.
+ */
+public enum ChecksumType {
+
+ NULL((byte) 0) {
+ @Override
+ public String getName() {
+ return "NULL";
+ }
+ },
+
+ CRC32((byte) 1) {
+ @Override
+ public String getName() {
+ return "CRC32";
+ }
+ },
+
+ CRC32C((byte) 2) {
+ @Override
+ public String getName() {
+ return "CRC32C";
+ }
+ };
+
+ private final byte code;
+
+ public static ChecksumType getDefaultChecksumType() {
+ return ChecksumType.CRC32C;
+ }
+
+ /** returns the name of this checksum type */
+ public abstract String getName();
+
+ private ChecksumType(final byte c) {
+ this.code = c;
+ }
+
+ public byte getCode() {
+ return this.code;
+ }
+
+ /**
+ * Use designated byte value to indicate checksum type.
+ * @return type associated with passed code
+ */
+ public static ChecksumType codeToType(final byte b) {
+ for (ChecksumType t : ChecksumType.values()) {
+ if (t.getCode() == b) {
+ return t;
+ }
+ }
+ throw new RuntimeException("Unknown checksum type code " + b);
+ }
+
+ /**
+ * Map a checksum name to a specific type.
+ * @return type associated with the name
+ */
+ public static ChecksumType nameToType(final String name) {
+ for (ChecksumType t : ChecksumType.values()) {
+ if (t.getName().equals(name)) {
+ return t;
+ }
+ }
+ throw new RuntimeException("Unknown checksum type name " + name);
+ }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
index 1723b482a95..14290362e48 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java
@@ -19,11 +19,16 @@
package org.apache.hudi.io.hfile;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.compress.CompressionCodec;
+import com.google.protobuf.CodedOutputStream;
+
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import static org.apache.hudi.io.hfile.DataSize.MAGIC_LENGTH;
import static org.apache.hudi.io.hfile.DataSize.SIZEOF_BYTE;
@@ -43,9 +48,10 @@ public abstract class HFileBlock {
// followed by another 4 byte value to store sizeofDataOnDisk.
public static final int HFILEBLOCK_HEADER_SIZE =
HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM + SIZEOF_BYTE + 2 * SIZEOF_INT32;
-
// Each checksum value is an integer that can be stored in 4 bytes.
static final int CHECKSUM_SIZE = SIZEOF_INT32;
+ private static final int DEFAULT_BYTES_PER_CHECKSUM = 16 * 1024;
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
static class Header {
// Format of header is:
@@ -80,6 +86,13 @@ public abstract class HFileBlock {
protected byte[] compressedByteBuff;
protected int startOffsetInCompressedBuff;
+ // Write properties
+ private long startOffsetInBuffForWrite = -1;
+ private long previousBlockOffsetForWrite = -1;
+
+ /**
+ * Initialize HFileBlock for read.
+ */
protected HFileBlock(HFileContext context,
HFileBlockType blockType,
byte[] byteBuff,
@@ -107,6 +120,25 @@ public abstract class HFileBlock {
this.startOffsetInBuff + HFILEBLOCK_HEADER_SIZE +
uncompressedSizeWithoutHeader;
}
+ /**
+ * Initialize HFileBlock for write.
+ */
+ protected HFileBlock(HFileContext context,
+ HFileBlockType blockType,
+ long previousBlockOffsetForWrite) {
+ this.context = context;
+ this.blockType = blockType;
+ this.previousBlockOffsetForWrite = previousBlockOffsetForWrite;
+ // Set other reader properties to invalid values
+ this.byteBuff = null;
+ this.startOffsetInBuff = -1;
+ this.sizeCheckSum = -1;
+ this.uncompressedEndOffset = -1;
+ this.onDiskSizeWithoutHeader = -1;
+ this.uncompressedSizeWithoutHeader = -1;
+ this.bytesPerChecksum = -1;
+ }
+
/**
* Parses the HFile block header and returns the {@link HFileBlock} instance
based on the input.
*
@@ -196,7 +228,7 @@ public abstract class HFileBlock {
compressedByteBuff, startOffsetInCompressedBuff, byteBuff, 0,
HFILEBLOCK_HEADER_SIZE);
try (InputStream byteBuffInputStream = new ByteArrayInputStream(
compressedByteBuff, startOffsetInCompressedBuff +
HFILEBLOCK_HEADER_SIZE, onDiskSizeWithoutHeader)) {
- context.getDecompressor().decompress(
+ context.getCompressor().decompress(
byteBuffInputStream,
byteBuff,
HFILEBLOCK_HEADER_SIZE,
@@ -217,4 +249,91 @@ public abstract class HFileBlock {
int capacity = HFILEBLOCK_HEADER_SIZE + uncompressedSizeWithoutHeader +
sizeCheckSum;
return new byte[capacity];
}
+
+ // ================ Below are for Write ================
+
+ /**
+ * Returns serialized "data" part of the block.
+ * This function must be implemented by each block type separately.
+ */
+ protected abstract ByteBuffer getUncompressedBlockDataToWrite();
+
+ /**
+ * Return serialized block including header, data, checksum.
+ */
+ public ByteBuffer serialize() throws IOException {
+ // Block payload.
+ ByteBuffer uncompressedBlockData = getUncompressedBlockDataToWrite();
+ // Compress if specified.
+ ByteBuffer compressedBlockData =
context.getCompressor().compress(uncompressedBlockData);
+ // Buffer for building block.
+ ByteBuffer buf = ByteBuffer.allocate(Math.max(
+ context.getBlockSize() * 2,
+ compressedBlockData.limit() + HFILEBLOCK_HEADER_SIZE * 2));
+
+ // Block header
+ // 1. Magic is always 8 bytes.
+ buf.put(blockType.getMagic(), 0, 8);
+ // 2. onDiskSizeWithoutHeader.
+ buf.putInt(compressedBlockData.limit());
+ // 3. uncompressedSizeWithoutHeader.
+ buf.putInt(uncompressedBlockData.limit());
+ // 4. Previous block offset.
+ buf.putLong(previousBlockOffsetForWrite);
+ // 5. Checksum type.
+ buf.put(context.getChecksumType().getCode());
+ // 6. Bytes covered per checksum.
+ buf.putInt(DEFAULT_BYTES_PER_CHECKSUM);
+ // 7. onDiskDataSizeWithHeader
+ int onDiskDataSizeWithHeader =
+ HFileBlock.HFILEBLOCK_HEADER_SIZE + compressedBlockData.limit();
+ buf.putInt(onDiskDataSizeWithHeader);
+ // 8. Payload.
+ buf.put(compressedBlockData);
+ // 9. Checksum.
+ buf.put(generateChecksumBytes(context.getChecksumType()));
+
+ // Update sizes
+ buf.flip();
+ return buf;
+ }
+
+ /**
+ * Sets start offset of the block in the buffer.
+ */
+ protected void setStartOffsetInBuffForWrite(long startOffsetInBuffForWrite) {
+ this.startOffsetInBuffForWrite = startOffsetInBuffForWrite;
+ }
+
+ /**
+ * Gets start offset of the block in the buffer.
+ */
+ protected long getStartOffsetInBuffForWrite() {
+ return this.startOffsetInBuffForWrite;
+ }
+
+ /**
+ * Returns checksum bytes if checksum type is not NULL.
+ * Note that current HFileReaderImpl does not support non-NULL checksum.
+ */
+ private byte[] generateChecksumBytes(ChecksumType type) {
+ if (type == ChecksumType.NULL) {
+ return EMPTY_BYTE_ARRAY;
+ }
+ throw new HoodieException("Only NULL checksum type is supported");
+ }
+
+ /**
+ * Returns the bytes of the variable length encoding for an integer.
+ * @param length an integer, normally representing a length.
+ * @return variable length encoding.
+ * @throws IOException upon error.
+ */
+ static byte[] getVariableLengthEncodedBytes(int length) throws IOException {
+ ByteArrayOutputStream varintBuffer = new ByteArrayOutputStream();
+ CodedOutputStream varintOutput =
CodedOutputStream.newInstance(varintBuffer);
+ varintOutput.writeUInt32NoTag(length);
+ varintOutput.flush();
+ return varintBuffer.toByteArray();
+ }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
index 284988cc4cd..ee800a1152c 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockType.java
@@ -168,4 +168,8 @@ public enum HFileBlockType {
+ new String(magic) + ", got " + new String(buf));
}
}
+
+ public byte[] getMagic() {
+ return magic;
+ }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
index d47daef30ec..d2864e21ac5 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileContext.java
@@ -20,27 +20,39 @@
package org.apache.hudi.io.hfile;
import org.apache.hudi.io.compress.CompressionCodec;
-import org.apache.hudi.io.compress.HoodieDecompressor;
-import org.apache.hudi.io.compress.HoodieDecompressorFactory;
+import org.apache.hudi.io.compress.HoodieCompressor;
+import org.apache.hudi.io.compress.HoodieCompressorFactory;
/**
* The context of HFile that contains information of the blocks.
*/
public class HFileContext {
private final CompressionCodec compressionCodec;
- private final HoodieDecompressor decompressor;
+ private final HoodieCompressor compressor;
+ private final ChecksumType checksumType;
+ private final int blockSize;
- private HFileContext(CompressionCodec compressionCodec) {
+ private HFileContext(CompressionCodec compressionCodec, int blockSize,
ChecksumType checksumType) {
this.compressionCodec = compressionCodec;
- this.decompressor =
HoodieDecompressorFactory.getDecompressor(compressionCodec);
+ this.compressor = HoodieCompressorFactory.getCompressor(compressionCodec);
+ this.blockSize = blockSize;
+ this.checksumType = checksumType;
}
CompressionCodec getCompressionCodec() {
return compressionCodec;
}
- HoodieDecompressor getDecompressor() {
- return decompressor;
+ HoodieCompressor getCompressor() {
+ return compressor;
+ }
+
+ int getBlockSize() {
+ return blockSize;
+ }
+
+ ChecksumType getChecksumType() {
+ return checksumType;
}
public static Builder builder() {
@@ -49,8 +61,12 @@ public class HFileContext {
public static class Builder {
private CompressionCodec compressionCodec = CompressionCodec.NONE;
+ private int blockSize = 1024 * 1024;
+ private ChecksumType checksumType = ChecksumType.NULL;
- public Builder() {
+ public Builder blockSize(int blockSize) {
+ this.blockSize = blockSize;
+ return this;
}
public Builder compressionCodec(CompressionCodec compressionCodec) {
@@ -58,8 +74,13 @@ public class HFileContext {
return this;
}
+ public Builder checksumType(ChecksumType checksumType) {
+ this.checksumType = checksumType;
+ return this;
+ }
+
public HFileContext build() {
- return new HFileContext(compressionCodec);
+ return new HFileContext(compressionCodec, blockSize, checksumType);
}
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
index b7b152d4773..7f62b1c5416 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java
@@ -21,6 +21,12 @@ package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
import static
org.apache.hudi.io.hfile.HFileReader.SEEK_TO_BEFORE_BLOCK_FIRST_KEY;
import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_FOUND;
import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_IN_RANGE;
@@ -38,7 +44,9 @@ public class HFileDataBlock extends HFileBlock {
// End offset of content in the block, relative to the start of the start of
the block
protected final int uncompressedContentEndRelativeOffset;
+ private final List<KeyValueEntry> entriesToWrite = new ArrayList<>();
+ // For read purpose.
protected HFileDataBlock(HFileContext context,
byte[] byteBuff,
int startOffsetInBuff) {
@@ -48,6 +56,18 @@ public class HFileDataBlock extends HFileBlock {
this.uncompressedEndOffset - this.sizeCheckSum -
this.startOffsetInBuff;
}
+ // For write purpose.
+ private HFileDataBlock(HFileContext context, long previousBlockOffset) {
+ super(context, HFileBlockType.DATA, previousBlockOffset);
+ // This is not used for write.
+ uncompressedContentEndRelativeOffset = -1;
+ }
+
+ static HFileDataBlock createDataBlockToWrite(HFileContext context,
+ long previousBlockOffset) {
+ return new HFileDataBlock(context, previousBlockOffset);
+ }
+
/**
* Seeks to the key to look up. The key may not have an exact match.
*
@@ -70,7 +90,7 @@ public class HFileDataBlock extends HFileBlock {
* of the data block; the cursor points to the actual first key of the data
block which is
* lexicographically greater than the lookup key.
*/
- public int seekTo(HFileCursor cursor, Key key, int blockStartOffsetInFile) {
+ int seekTo(HFileCursor cursor, Key key, int blockStartOffsetInFile) {
int relativeOffset = cursor.getOffset() - blockStartOffsetInFile;
int lastRelativeOffset = relativeOffset;
Option<KeyValue> lastKeyValue = cursor.getKeyValue();
@@ -123,7 +143,7 @@ public class HFileDataBlock extends HFileBlock {
* @param offset offset to read relative to the start of {@code byteBuff}.
* @return the {@link KeyValue} instance.
*/
- public KeyValue readKeyValue(int offset) {
+ KeyValue readKeyValue(int offset) {
return new KeyValue(byteBuff, offset);
}
@@ -135,7 +155,7 @@ public class HFileDataBlock extends HFileBlock {
* HFile.
* @return {@code true} if there is next {@link KeyValue}; {code false}
otherwise.
*/
- public boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
+ boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
int offset = cursor.getOffset() - blockStartOffsetInFile;
Option<KeyValue> keyValue = cursor.getKeyValue();
if (!keyValue.isPresent()) {
@@ -149,4 +169,56 @@ public class HFileDataBlock extends HFileBlock {
private boolean isAtFirstKey(int relativeOffset) {
return relativeOffset == HFILEBLOCK_HEADER_SIZE;
}
+
+ // ================ Below are for Write ================
+
+ boolean isEmpty() {
+ return entriesToWrite.isEmpty();
+ }
+
+ void add(byte[] key, byte[] value) {
+ KeyValueEntry kv = new KeyValueEntry(key, value);
+ // Assume all entries are sorted before write.
+ entriesToWrite.add(kv);
+ }
+
+ int getNumOfEntries() {
+ return entriesToWrite.size();
+ }
+
+ byte[] getFirstKey() {
+ return entriesToWrite.get(0).key;
+ }
+
+ byte[] getLastKeyContent() {
+ if (entriesToWrite.isEmpty()) {
+ return new byte[0];
+ }
+ return entriesToWrite.get(entriesToWrite.size() - 1).key;
+ }
+
+ @Override
+ protected ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize());
+ for (KeyValueEntry kv : entriesToWrite) {
+ // Length of key + length of a short variable indicating length of key.
+ dataBuf.putInt(kv.key.length + SIZEOF_INT16);
+ // Length of value.
+ dataBuf.putInt(kv.value.length);
+ // Key content length.
+ dataBuf.putShort((short)kv.key.length);
+ // Key.
+ dataBuf.put(kv.key);
+ // Value.
+ dataBuf.put(kv.value);
+ // MVCC.
+ dataBuf.put((byte)0);
+ // Copy to output stream.
+ baos.write(dataBuf.array(), 0, dataBuf.position());
+ // Clear the buffer.
+ dataBuf.clear();
+ }
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
index e0b93201924..2f9918f6d07 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileFileInfoBlock.java
@@ -22,12 +22,16 @@ package org.apache.hudi.io.hfile;
import org.apache.hudi.io.hfile.protobuf.generated.HFileProtos;
import org.apache.hudi.io.util.IOUtils;
+import com.google.protobuf.ByteString;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Represents a {@link HFileBlockType#FILE_INFO} block.
@@ -35,6 +39,8 @@ import static
org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
public class HFileFileInfoBlock extends HFileBlock {
// Magic we put ahead of a serialized protobuf message
public static final byte[] PB_MAGIC = new byte[] {'P', 'B', 'U', 'F'};
+ // Write properties
+ private final Map<String, byte[]> fileInfoToWrite = new HashMap<>();
public HFileFileInfoBlock(HFileContext context,
byte[] byteBuff,
@@ -42,6 +48,14 @@ public class HFileFileInfoBlock extends HFileBlock {
super(context, HFileBlockType.FILE_INFO, byteBuff, startOffsetInBuff);
}
+ private HFileFileInfoBlock(HFileContext context) {
+ super(context, HFileBlockType.FILE_INFO, -1L);
+ }
+
+ public static HFileFileInfoBlock createFileInfoBlockToWrite(HFileContext
context) {
+ return new HFileFileInfoBlock(context);
+ }
+
public HFileInfo readFileInfo() throws IOException {
int pbMagicLength = PB_MAGIC.length;
if (IOUtils.compareTo(PB_MAGIC, 0, pbMagicLength,
@@ -61,4 +75,35 @@ public class HFileFileInfoBlock extends HFileBlock {
}
return new HFileInfo(fileInfoMap);
}
+
+ // ================ Below are for Write ================
+
+ public void add(String name, byte[] value) {
+ fileInfoToWrite.put(name, value);
+ }
+
+ @Override
+ public ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteBuffer buff = ByteBuffer.allocate(context.getBlockSize() * 2);
+ HFileProtos.InfoProto.Builder builder =
+ HFileProtos.InfoProto.newBuilder();
+ for (Map.Entry<String, byte[]> e : fileInfoToWrite.entrySet()) {
+ HFileProtos.BytesBytesPair bbp = HFileProtos.BytesBytesPair
+ .newBuilder()
+ .setFirst(ByteString.copyFrom(getUTF8Bytes(e.getKey())))
+ .setSecond(ByteString.copyFrom(e.getValue()))
+ .build();
+ builder.addMapEntry(bbp);
+ }
+ buff.put(PB_MAGIC);
+ byte[] payload = builder.build().toByteArray();
+ try {
+ buff.put(getVariableLengthEncodedBytes(payload.length));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to calculate File Info variable
length");
+ }
+ buff.put(payload);
+ buff.flip();
+ return buff;
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java
new file mode 100644
index 00000000000..72ddd6edf21
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileIndexBlock.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class HFileIndexBlock extends HFileBlock {
+ protected final List<BlockIndexEntry> entries = new ArrayList<>();
+ protected long blockDataSize = -1L;
+
+ protected HFileIndexBlock(HFileContext context,
+ HFileBlockType blockType,
+ byte[] byteBuff,
+ int startOffsetInBuff) {
+ super(context, blockType, byteBuff, startOffsetInBuff);
+ }
+
+ protected HFileIndexBlock(HFileContext context,
+ HFileBlockType blockType) {
+ super(context, blockType, -1L);
+ }
+
+ public void add(byte[] firstKey, long offset, int size) {
+ Key key = new Key(firstKey);
+ entries.add(new BlockIndexEntry(key, Option.empty(), offset, size));
+ }
+
+ public int getNumOfEntries() {
+ return entries.size();
+ }
+
+ public boolean isEmpty() {
+ return entries.isEmpty();
+ }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
index adc7c312936..b2c8e141675 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java
@@ -29,13 +29,13 @@ import java.util.Map;
*/
public class HFileInfo {
private static final String RESERVED_PREFIX = "hfile.";
- private static final UTF8StringKey LAST_KEY =
+ static final UTF8StringKey LAST_KEY =
new UTF8StringKey(RESERVED_PREFIX + "LASTKEY");
private static final UTF8StringKey FILE_CREATION_TIME_TS =
new UTF8StringKey(RESERVED_PREFIX + "CREATE_TIME_TS");
private static final UTF8StringKey KEY_VALUE_VERSION =
new UTF8StringKey("KEY_VALUE_VERSION");
- private static final UTF8StringKey MAX_MVCC_TS_KEY =
+ static final UTF8StringKey MAX_MVCC_TS_KEY =
new UTF8StringKey("MAX_MEMSTORE_TS_KEY");
private static final int KEY_VALUE_VERSION_WITH_MVCC_TS = 1;
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
index e664145bbc1..a66a85c5b9e 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileLeafIndexBlock.java
@@ -21,7 +21,9 @@ package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -47,6 +49,11 @@ public class HFileLeafIndexBlock extends HFileBlock {
super(context, blockType, byteBuff, startOffsetInBuff);
}
+ @Override
+ protected ByteBuffer getUncompressedBlockDataToWrite() {
+ throw new HoodieException("HFile writer does not support leaf index
block");
+ }
+
/**
* Reads the index block and returns the block index entries.
*/
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
index 67ab0963824..84ffa70f6ff 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
@@ -25,15 +25,43 @@ import java.nio.ByteBuffer;
* Represents a {@link HFileBlockType#META} block.
*/
public class HFileMetaBlock extends HFileBlock {
+ protected KeyValueEntry entryToWrite;
+
protected HFileMetaBlock(HFileContext context,
byte[] byteBuff,
int startOffsetInBuff) {
super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
}
+ private HFileMetaBlock(HFileContext context, KeyValueEntry keyValueEntry) {
+ super(context, HFileBlockType.META, -1L);
+ this.entryToWrite = keyValueEntry;
+ }
+
+ static HFileMetaBlock createMetaBlockToWrite(HFileContext context,
+ KeyValueEntry keyValueEntry) {
+ return new HFileMetaBlock(context, keyValueEntry);
+ }
+
public ByteBuffer readContent() {
return ByteBuffer.wrap(
getByteBuff(),
startOffsetInBuff + HFILEBLOCK_HEADER_SIZE,
uncompressedSizeWithoutHeader);
}
+
+ // ================ Below are for Write ================
+
+ public byte[] getFirstKey() {
+ return entryToWrite.key;
+ }
+
+ @Override
+ public ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize());
+ // Note that: only value should be store in the block.
+ // The key is stored in the meta index block.
+ dataBuf.put(entryToWrite.value);
+ dataBuf.flip();
+ return dataBuf;
+ }
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
new file mode 100644
index 00000000000..03159699149
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaIndexBlock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class HFileMetaIndexBlock extends HFileIndexBlock {
+
+ private HFileMetaIndexBlock(HFileContext context) {
+ super(context, HFileBlockType.ROOT_INDEX);
+ }
+
+ public static HFileMetaIndexBlock createMetaIndexBlockToWrite(HFileContext
context) {
+ return new HFileMetaIndexBlock(context);
+ }
+
+ @Override
+ public ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteBuffer buf = ByteBuffer.allocate(context.getBlockSize() * 2);
+ for (BlockIndexEntry entry : entries) {
+ buf.putLong(entry.getOffset());
+ buf.putInt(entry.getSize());
+ // Key length.
+ try {
+ byte[] keyLength =
getVariableLengthEncodedBytes(entry.getFirstKey().getLength());
+ buf.put(keyLength);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to serialize number: " + entry.getFirstKey().getLength());
+ }
+ // Note that: NO two-bytes for encoding key length.
+ // Key.
+ buf.put(entry.getFirstKey().getBytes());
+ }
+ buf.flip();
+
+ // Set metrics.
+ blockDataSize = buf.limit();
+ return buf;
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index d176af6d251..b3be1e7d6db 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -264,6 +264,14 @@ public class HFileReaderImpl implements HFileReader {
stream.close();
}
+ HFileTrailer getTrailer() {
+ return trailer;
+ }
+
+ Map<Key, BlockIndexEntry> getDataBlockIndexMap() {
+ return dataBlockIndexEntryMap;
+ }
+
/**
* Reads and parses the HFile trailer.
*
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
index 099e42ad44c..19ae69a8517 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileRootIndexBlock.java
@@ -21,10 +21,14 @@ package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
import static org.apache.hudi.io.util.IOUtils.copy;
import static org.apache.hudi.io.util.IOUtils.decodeVarLongSizeOnDisk;
import static org.apache.hudi.io.util.IOUtils.readInt;
@@ -34,13 +38,21 @@ import static org.apache.hudi.io.util.IOUtils.readVarLong;
/**
* Represents a {@link HFileBlockType#ROOT_INDEX} block.
*/
-public class HFileRootIndexBlock extends HFileBlock {
+public class HFileRootIndexBlock extends HFileIndexBlock {
public HFileRootIndexBlock(HFileContext context,
byte[] byteBuff,
int startOffsetInBuff) {
super(context, HFileBlockType.ROOT_INDEX, byteBuff, startOffsetInBuff);
}
+ private HFileRootIndexBlock(HFileContext context) {
+ super(context, HFileBlockType.ROOT_INDEX);
+ }
+
+ public static HFileRootIndexBlock createRootIndexBlockToWrite(HFileContext
context) {
+ return new HFileRootIndexBlock(context);
+ }
+
/**
* Reads the index block and returns the block index entry to an in-memory
{@link TreeMap}
* for searches.
@@ -86,4 +98,37 @@ public class HFileRootIndexBlock extends HFileBlock {
}
return indexEntryList;
}
+
+ @Override
+ public ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteBuffer buf = ByteBuffer.allocate(context.getBlockSize());
+ for (BlockIndexEntry entry : entries) {
+ buf.putLong(entry.getOffset());
+ buf.putInt(entry.getSize());
+
+ // Key length + 2.
+ try {
+ byte[] keyLength = getVariableLengthEncodedBytes(
+ entry.getFirstKey().getLength() + SIZEOF_INT16);
+ buf.put(keyLength);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to serialize number: " + entry.getFirstKey().getLength() +
SIZEOF_INT16);
+ }
+ // Key length.
+ buf.putShort((short) entry.getFirstKey().getLength());
+ // Key.
+ buf.put(entry.getFirstKey().getBytes());
+ // Copy to output stream.
+ baos.write(buf.array(), 0, buf.position());
+ // Clear the buffer.
+ buf.clear();
+ }
+
+ // Output all data in a buffer.
+ byte[] allData = baos.toByteArray();
+ blockDataSize = allData.length;
+ return ByteBuffer.wrap(allData);
+ }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
index 7aff7d2c830..474ab71a2ae 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileTrailer.java
@@ -28,7 +28,6 @@ import java.util.Arrays;
import static org.apache.hudi.io.hfile.DataSize.MAGIC_LENGTH;
import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT32;
-import static org.apache.hudi.io.hfile.HFileUtils.decodeCompressionCodec;
/**
* Represents a HFile trailer, which is serialized and deserialized using
@@ -159,7 +158,7 @@ public class HFileTrailer {
comparatorClassName = trailerProto.getComparatorClassName();
}
if (trailerProto.hasCompressionCodec()) {
- compressionCodec =
decodeCompressionCodec(trailerProto.getCompressionCodec());
+ compressionCodec =
CompressionCodec.decodeCompressionCodec(trailerProto.getCompressionCodec());
} else {
compressionCodec = CompressionCodec.NONE;
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
index bd3568d0b2d..1ded713c59d 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java
@@ -19,35 +19,14 @@
package org.apache.hudi.io.hfile;
-import org.apache.hudi.io.compress.CompressionCodec;
import org.apache.hudi.io.util.IOUtils;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
/**
* Util methods for reading and writing HFile.
*/
public class HFileUtils {
- private static final Map<Integer, CompressionCodec>
HFILE_COMPRESSION_CODEC_MAP = createCompressionCodecMap();
-
- /**
- * Gets the compression codec based on the ID. This ID is written to the
HFile on storage.
- *
- * @param id ID indicating the compression codec.
- * @return compression codec based on the ID.
- */
- public static CompressionCodec decodeCompressionCodec(int id) {
- CompressionCodec codec = HFILE_COMPRESSION_CODEC_MAP.get(id);
- if (codec == null) {
- throw new IllegalArgumentException("Compression code not found for ID: "
+ id);
- }
- return codec;
- }
-
/**
* Reads the HFile major version from the input.
*
@@ -106,23 +85,4 @@ public class HFileUtils {
public static String getValue(KeyValue kv) {
return fromUTF8Bytes(kv.getBytes(), kv.getValueOffset(),
kv.getValueLength());
}
-
- /**
- * The ID mapping cannot change or else that breaks all existing HFiles out
there,
- * even the ones that are not compressed! (They use the NONE algorithm)
- * This is because HFile stores the ID to indicate which compression codec
is used.
- *
- * @return the mapping of ID to compression codec.
- */
- private static Map<Integer, CompressionCodec> createCompressionCodecMap() {
- Map<Integer, CompressionCodec> result = new HashMap<>();
- result.put(0, CompressionCodec.LZO);
- result.put(1, CompressionCodec.GZIP);
- result.put(2, CompressionCodec.NONE);
- result.put(3, CompressionCodec.SNAPPY);
- result.put(4, CompressionCodec.LZ4);
- result.put(5, CompressionCodec.BZIP2);
- result.put(6, CompressionCodec.ZSTD);
- return Collections.unmodifiableMap(result);
- }
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
similarity index 59%
copy from hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
copy to hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
index 67ab0963824..617561f0b43 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriter.java
@@ -19,21 +19,27 @@
package org.apache.hudi.io.hfile;
-import java.nio.ByteBuffer;
+import java.io.Closeable;
+import java.io.IOException;
/**
- * Represents a {@link HFileBlockType#META} block.
+ * The interface for HFile writer to implement.
*/
-public class HFileMetaBlock extends HFileBlock {
- protected HFileMetaBlock(HFileContext context,
- byte[] byteBuff,
- int startOffsetInBuff) {
- super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
- }
+public interface HFileWriter extends Closeable {
+ /**
+ * Append a key-value pair into a data block.
+ * The caller must guarantee that the key lexicographically increments or
the same
+ * as the last key.
+ */
+ void append(String key, byte[] value) throws IOException;
- public ByteBuffer readContent() {
- return ByteBuffer.wrap(
- getByteBuff(),
- startOffsetInBuff + HFILEBLOCK_HEADER_SIZE,
uncompressedSizeWithoutHeader);
- }
+ /**
+ * Append a piece of file info.
+ */
+ void appendFileInfo(String name, byte[] value);
+
+ /**
+ * Append a piece of meta info.
+ */
+ void appendMetaInfo(String name, byte[] value);
}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java
new file mode 100644
index 00000000000..c8153328ca7
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.hfile.protobuf.generated.HFileProtos;
+
+import com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.io.hfile.DataSize.SIZEOF_INT16;
+import static
org.apache.hudi.io.hfile.HFileBlock.getVariableLengthEncodedBytes;
+import static org.apache.hudi.io.hfile.HFileBlockType.TRAILER;
+import static org.apache.hudi.io.hfile.HFileInfo.LAST_KEY;
+import static org.apache.hudi.io.hfile.HFileInfo.MAX_MVCC_TS_KEY;
+import static org.apache.hudi.io.hfile.HFileTrailer.TRAILER_SIZE;
+
+/**
+ * Pure Java implementation of HFile writer (HFile v3 format) for Hudi.
+ */
+public class HFileWriterImpl implements HFileWriter {
+ private static final String COMPARATOR_CLASS_NAME
+ = "org.apache.hudi.io.storage.HoodieHBaseKVComparator";
+ private static final byte HFILE_VERSION = (byte) 3;
+ private final OutputStream outputStream;
+ private final HFileContext context;
+ // Meta Info map.
+ private final Map<String, byte[]> metaInfo = new HashMap<>();
+ // Data block under construction.
+ private HFileDataBlock currentDataBlock;
+ // Meta block under construction.
+ private final HFileRootIndexBlock rootIndexBlock;
+ private final HFileMetaIndexBlock metaIndexBlock;
+ private final HFileFileInfoBlock fileInfoBlock;
+ private long uncompressedDataBlockBytes;
+ private long totalUncompressedDataBlockBytes;
+ private long currentOffset;
+ private long loadOnOpenSectionOffset;
+ private final int blockSize;
+
+ // Variables used to record necessary information to reduce
+ // the memory usage.
+ private byte[] lastKey = new byte[0];
+ private long firstDataBlockOffset = -1;
+ private long lastDataBlockOffset;
+ private long totalNumberOfRecords = 0;
+
+ public HFileWriterImpl(HFileContext context, OutputStream outputStream) {
+ this.outputStream = outputStream;
+ this.context = context;
+ this.blockSize = this.context.getBlockSize();
+ this.uncompressedDataBlockBytes = 0L;
+ this.totalUncompressedDataBlockBytes = 0L;
+ this.currentOffset = 0L;
+ this.currentDataBlock = HFileDataBlock.createDataBlockToWrite(context,
-1L);
+ this.rootIndexBlock =
HFileRootIndexBlock.createRootIndexBlockToWrite(context);
+ this.metaIndexBlock =
HFileMetaIndexBlock.createMetaIndexBlockToWrite(context);
+ this.fileInfoBlock =
HFileFileInfoBlock.createFileInfoBlockToWrite(context);
+ initFileInfo();
+ }
+
+ // Append a data kv pair.
+ public void append(String key, byte[] value) throws IOException {
+ byte[] keyBytes = StringUtils.getUTF8Bytes(key);
+ lastKey = keyBytes;
+ // Records with the same key must be put into the same block.
+ // Here 9 = 4 bytes of key length + 4 bytes of value length + 1 byte MVCC.
+ if (!Arrays.equals(currentDataBlock.getLastKeyContent(), keyBytes)
+ && uncompressedDataBlockBytes + keyBytes.length + value.length + 9 >
blockSize) {
+ flushCurrentDataBlock();
+ uncompressedDataBlockBytes = 0;
+ }
+ currentDataBlock.add(keyBytes, value);
+ int uncompressedKeyValueSize = keyBytes.length + value.length;
+ uncompressedDataBlockBytes += uncompressedKeyValueSize + 9;
+ totalUncompressedDataBlockBytes += uncompressedKeyValueSize + 9;
+ }
+
+ // Append a metadata kv pair.
+ public void appendMetaInfo(String name, byte[] value) {
+ metaInfo.put(name, value);
+ }
+
+ // Append a file info kv pair.
+ public void appendFileInfo(String name, byte[] value) {
+ fileInfoBlock.add(name, value);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flushCurrentDataBlock();
+ flushMetaBlocks();
+ writeLoadOnOpenSection();
+ writeTrailer();
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ private void flushCurrentDataBlock() throws IOException {
+ // 0. Skip flush if no data.
+ if (currentDataBlock.isEmpty()) {
+ return;
+ }
+ // 1. Update metrics.
+ if (firstDataBlockOffset < 0) {
+ firstDataBlockOffset = currentOffset;
+ }
+ lastDataBlockOffset = currentOffset;
+ totalNumberOfRecords += currentDataBlock.getNumOfEntries();
+ // 2. Flush data block.
+ ByteBuffer blockBuffer = currentDataBlock.serialize();
+ writeBuffer(blockBuffer);
+ // 3. Create an index entry.
+ rootIndexBlock.add(
+ currentDataBlock.getFirstKey(), lastDataBlockOffset,
blockBuffer.limit());
+ // 4. Create a new data block.
+ currentDataBlock = HFileDataBlock.createDataBlockToWrite(context,
currentOffset);
+ }
+
+ // NOTE that: reader assumes that every meta info piece
+ // should be a separate meta block.
+ private void flushMetaBlocks() throws IOException {
+ for (Map.Entry<String, byte[]> e : metaInfo.entrySet()) {
+ HFileMetaBlock currentMetaBlock =
+ HFileMetaBlock.createMetaBlockToWrite(
+ context, new KeyValueEntry(StringUtils.getUTF8Bytes(e.getKey()),
e.getValue()));
+ ByteBuffer blockBuffer = currentMetaBlock.serialize();
+ long blockOffset = currentOffset;
+ currentMetaBlock.setStartOffsetInBuffForWrite(currentOffset);
+ writeBuffer(blockBuffer);
+ metaIndexBlock.add(
+ currentMetaBlock.getFirstKey(), blockOffset, blockBuffer.limit());
+ }
+ }
+
+ private void writeLoadOnOpenSection() throws IOException {
+ loadOnOpenSectionOffset = currentOffset;
+ // Write Root Data Index
+ ByteBuffer dataIndexBuffer = rootIndexBlock.serialize();
+ rootIndexBlock.setStartOffsetInBuffForWrite(currentOffset);
+ writeBuffer(dataIndexBuffer);
+ // Write Meta Data Index.
+ // Note: Even this block is empty, it has to be there
+ // due to the behavior of the reader.
+ ByteBuffer metaIndexBuffer = metaIndexBlock.serialize();
+ metaIndexBlock.setStartOffsetInBuffForWrite(currentOffset);
+ writeBuffer(metaIndexBuffer);
+ // Write File Info.
+ fileInfoBlock.add(
+ new String(LAST_KEY.getBytes(), StandardCharsets.UTF_8),
+ addKeyLength(lastKey));
+ fileInfoBlock.setStartOffsetInBuffForWrite(currentOffset);
+ writeBuffer(fileInfoBlock.serialize());
+ }
+
+ private void writeTrailer() throws IOException {
+ HFileProtos.TrailerProto.Builder builder =
HFileProtos.TrailerProto.newBuilder();
+ builder.setFileInfoOffset(fileInfoBlock.getStartOffsetInBuffForWrite());
+ builder.setLoadOnOpenDataOffset(loadOnOpenSectionOffset);
+ builder.setUncompressedDataIndexSize(totalUncompressedDataBlockBytes);
+ builder.setDataIndexCount(rootIndexBlock.getNumOfEntries());
+ builder.setMetaIndexCount(metaIndexBlock.getNumOfEntries());
+ builder.setEntryCount(totalNumberOfRecords);
+ // TODO(HUDI-9464): support multiple levels.
+ builder.setNumDataIndexLevels(1);
+ builder.setFirstDataBlockOffset(firstDataBlockOffset);
+ builder.setLastDataBlockOffset(lastDataBlockOffset);
+ builder.setComparatorClassName(COMPARATOR_CLASS_NAME);
+ builder.setCompressionCodec(context.getCompressionCodec().getId());
+ builder.setEncryptionKey(ByteString.EMPTY);
+ HFileProtos.TrailerProto trailerProto = builder.build();
+
+ ByteBuffer trailer = ByteBuffer.allocate(TRAILER_SIZE);
+ trailer.limit(TRAILER_SIZE);
+ trailer.put(TRAILER.getMagic());
+
trailer.put(getVariableLengthEncodedBytes(trailerProto.getSerializedSize()));
+ trailer.put(trailerProto.toByteArray());
+ // Force trailer to have fixed length.
+ trailer.position(TRAILER_SIZE - 1);
+ trailer.put(HFILE_VERSION);
+
+ trailer.flip();
+ writeBuffer(trailer);
+ }
+
+ private void writeBuffer(ByteBuffer buffer) throws IOException {
+ // Note that: Use `write(byte[], off, len)`, instead of `write(byte[])`.
+ outputStream.write(buffer.array(), 0, buffer.limit());
+ currentOffset += buffer.limit();
+ }
+
+ private void initFileInfo() {
+ fileInfoBlock.add(
+ new String(MAX_MVCC_TS_KEY.getBytes(), StandardCharsets.UTF_8),
+ new byte[]{0});
+ }
+
+ // Note: HFileReaderImpl assumes that:
+ // The last key should contain the content length bytes.
+ public byte[] addKeyLength(byte[] key) {
+ if (0 == key.length) {
+ return new byte[0];
+ }
+ ByteBuffer byteBuffer = ByteBuffer.allocate(key.length + SIZEOF_INT16);
+ byteBuffer.putShort((short) key.length);
+ byteBuffer.put(key);
+ return byteBuffer.array();
+ }
+}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
similarity index 64%
copy from hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
copy to hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
index 67ab0963824..479096c882b 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileMetaBlock.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/KeyValueEntry.java
@@ -22,18 +22,21 @@ package org.apache.hudi.io.hfile;
import java.nio.ByteBuffer;
/**
- * Represents a {@link HFileBlockType#META} block.
+ * This is data structure used to store the data before written
+ * into the file. By comparison, {@link KeyValue} is used for read.
*/
-public class HFileMetaBlock extends HFileBlock {
- protected HFileMetaBlock(HFileContext context,
- byte[] byteBuff,
- int startOffsetInBuff) {
- super(context, HFileBlockType.META, byteBuff, startOffsetInBuff);
+public class KeyValueEntry implements Comparable<KeyValueEntry> {
+ public final byte[] key;
+ public final byte[] value;
+
+ public KeyValueEntry(byte[] key, byte[] value) {
+ this.key = key;
+ this.value = value;
}
- public ByteBuffer readContent() {
- return ByteBuffer.wrap(
- getByteBuff(),
- startOffsetInBuff + HFILEBLOCK_HEADER_SIZE,
uncompressedSizeWithoutHeader);
+ @Override
+ public int compareTo(KeyValueEntry o) {
+ return ByteBuffer.wrap(this.key)
+ .compareTo(ByteBuffer.wrap(o.key));
}
}
diff --git a/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
index 672d1a6690a..b1d6592eea2 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/UTF8StringKey.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
*/
public class UTF8StringKey extends Key {
public UTF8StringKey(String key) {
-
super(key.getBytes(StandardCharsets.UTF_8));
}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java
b/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
similarity index 82%
rename from
hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java
rename to
hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
index d6883ce7743..c81af25e33e 100644
---
a/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieDecompressor.java
+++
b/hudi-io/src/test/java/org/apache/hudi/io/compress/TestHoodieCompressor.java
@@ -19,25 +19,25 @@
package org.apache.hudi.io.compress;
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipCompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneCompressor;
import org.apache.hudi.io.util.IOUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
-import java.util.zip.GZIPOutputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
- * Tests all implementation of {@link HoodieDecompressor}.
+ * Tests all implementation of {@link HoodieCompressor}.
*/
-public class TestHoodieDecompressor {
+public class TestHoodieCompressor {
private static final int INPUT_LENGTH = 394850;
private static final int[] READ_PART_SIZE_LIST =
new int[] {1200, 30956, 204958, INPUT_LENGTH + 50};
@@ -49,7 +49,7 @@ public class TestHoodieDecompressor {
switch (codec) {
case NONE:
case GZIP:
- HoodieDecompressor decompressor =
HoodieDecompressorFactory.getDecompressor(codec);
+ HoodieCompressor decompressor =
HoodieCompressorFactory.getCompressor(codec);
byte[] actualOutput = new byte[INPUT_LENGTH + 100];
try (InputStream stream = prepareInputStream(codec)) {
for (int sizeToRead : READ_PART_SIZE_LIST) {
@@ -65,20 +65,18 @@ public class TestHoodieDecompressor {
break;
default:
assertThrows(
- IllegalArgumentException.class, () ->
HoodieDecompressorFactory.getDecompressor(codec));
+ IllegalArgumentException.class, () ->
HoodieCompressorFactory.getCompressor(codec));
}
}
private static InputStream prepareInputStream(CompressionCodec codec) throws
IOException {
switch (codec) {
case NONE:
- return new ByteArrayInputStream(INPUT_BYTES);
+ return new ByteArrayInputStream(
+ new HoodieNoneCompressor().compress(INPUT_BYTES));
case GZIP:
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(stream))
{
- gzipOutputStream.write(INPUT_BYTES);
- }
- return new ByteArrayInputStream(stream.toByteArray());
+ return new ByteArrayInputStream(
+ new HoodieAirliftGzipCompressor().compress(INPUT_BYTES));
default:
throw new IllegalArgumentException("Not supported in tests.");
}
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
new file mode 100644
index 00000000000..a90c582c51f
--- /dev/null
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileWriter.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+
+import static org.apache.hudi.io.hfile.HFileBlockType.DATA;
+import static org.apache.hudi.io.hfile.HFileBlockType.TRAILER;
+import static org.apache.hudi.io.hfile.HFileInfo.LAST_KEY;
+import static org.apache.hudi.io.hfile.HFileInfo.MAX_MVCC_TS_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHFileWriter {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestHFileWriter.class);
+ private static final String TEST_FILE = "test.hfile";
+ private static final HFileContext CONTEXT = HFileContext.builder().build();
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ Files.deleteIfExists(Paths.get(TEST_FILE));
+ }
+
+ @Test
+ void testOverflow() throws Exception {
+ // 1. Write data.
+ writeTestFile();
+ // 2. Validate file size.
+ validateHFileSize();
+ // 3. Validate file structure.
+ validateHFileStructure();
+ // 4. Validate consistency with HFileReader.
+ validateConsistencyWithHFileReader();
+ LOG.info("All validations passed!");
+ }
+
+ @Test
+ void testSameKeyLocation() throws IOException {
+ // 50 bytes for data part limit.
+ HFileContext context = new HFileContext.Builder().blockSize(50).build();
+ String testFile = TEST_FILE;
+ try (DataOutputStream outputStream =
+ new DataOutputStream(Files.newOutputStream(Paths.get(testFile)));
+ HFileWriter writer = new HFileWriterImpl(context, outputStream)) {
+ for (int i = 0; i < 10; i++) {
+ writer.append("key00", String.format("value%02d", i).getBytes());
+ }
+ for (int i = 1; i < 11; i++) {
+ writer.append(
+ String.format("key%02d", i),
+ String.format("value%02d", i).getBytes());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Validate.
+ try (FileChannel channel = FileChannel.open(Paths.get(testFile),
StandardOpenOption.READ)) {
+ ByteBuffer buf = channel.map(FileChannel.MapMode.READ_ONLY, 0,
channel.size());
+ SeekableDataInputStream inputStream =
+ new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(buf));
+ HFileReaderImpl reader = new HFileReaderImpl(inputStream,
channel.size());
+ reader.initializeMetadata();
+ assertEquals(20, reader.getNumKeyValueEntries());
+ HFileTrailer trailer = reader.getTrailer();
+ assertEquals(6, trailer.getDataIndexCount());
+ int i = 0;
+ for (Key key : reader.getDataBlockIndexMap().keySet()) {
+ assertArrayEquals(
+ String.format("key%02d", i).getBytes(),
+ key.getContentInString().getBytes());
+ if (i == 0) {
+ i++;
+ } else {
+ i += 2;
+ }
+ }
+ }
+ }
+
+ @Test
+ void testUniqueKeyLocation() throws IOException {
+ // 50 bytes for data part limit.
+ HFileContext context = new HFileContext.Builder().blockSize(50).build();
+ String testFile = TEST_FILE;
+ try (DataOutputStream outputStream =
+ new DataOutputStream(Files.newOutputStream(Paths.get(testFile)));
+ HFileWriter writer = new HFileWriterImpl(context, outputStream)) {
+ for (int i = 0; i < 50; i++) {
+ writer.append(
+ String.format("key%02d", i), String.format("value%02d",
i).getBytes());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Validate.
+ try (FileChannel channel = FileChannel.open(Paths.get(testFile),
StandardOpenOption.READ)) {
+ ByteBuffer buf = channel.map(FileChannel.MapMode.READ_ONLY, 0,
channel.size());
+ SeekableDataInputStream inputStream =
+ new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(buf));
+ HFileReaderImpl reader = new HFileReaderImpl(inputStream,
channel.size());
+ reader.initializeMetadata();
+ assertEquals(50, reader.getNumKeyValueEntries());
+ HFileTrailer trailer = reader.getTrailer();
+ assertEquals(25, trailer.getDataIndexCount());
+ reader.seekTo();
+ for (int i = 0; i < 50; i++) {
+ KeyValue kv = reader.getKeyValue().get();
+ System.out.println(kv.getKey().getContentInString());
+ assertArrayEquals(
+ String.format("key%02d", i).getBytes(),
+ kv.getKey().getContentInString().getBytes());
+ assertArrayEquals(
+ String.format("value%02d", i).getBytes(),
+ Arrays.copyOfRange(
+ kv.getBytes(),
+ kv.getValueOffset(),
+ kv.getValueOffset() + kv.getValueLength())
+ );
+ reader.next();
+ }
+ }
+ }
+
+ private static void writeTestFile() throws Exception {
+ try (
+ DataOutputStream outputStream =
+ new DataOutputStream(Files.newOutputStream(Paths.get(TEST_FILE)));
+ HFileWriter writer = new HFileWriterImpl(CONTEXT, outputStream)) {
+ writer.append("key1", "value1".getBytes());
+ writer.append("key2", "value2".getBytes());
+ writer.append("key3", "value3".getBytes());
+ }
+ }
+
+ private static void validateHFileSize() throws IOException {
+ Path path = Paths.get(TEST_FILE);
+ long actualSize = Files.size(path);
+ long expectedSize = 4366L;
+ assertEquals(expectedSize, actualSize);
+ }
+
+ private static void validateHFileStructure() throws IOException {
+ ByteBuffer fileBuffer = mapFileToBuffer();
+
+ // 1. Validate Trailer
+ validateTrailer(fileBuffer);
+
+ // 2. Validate Data block.
+ validateDataBlocks(fileBuffer);
+ }
+
+ private static void validateConsistencyWithHFileReader() throws IOException {
+ ByteBuffer content = mapFileToBuffer();
+ try (HFileReader reader = new HFileReaderImpl(
+ new ByteArraySeekableDataInputStream(
+ new ByteBufferBackedInputStream(content)), content.limit())) {
+ reader.initializeMetadata();
+ assertEquals(3, reader.getNumKeyValueEntries());
+ assertTrue(reader.getMetaInfo(LAST_KEY).isPresent());
+ assertEquals(1, reader.getMetaInfo(MAX_MVCC_TS_KEY).get().length);
+ }
+ }
+
+ private static ByteBuffer mapFileToBuffer() throws IOException {
+ try (FileChannel channel = FileChannel.open(Paths.get(TEST_FILE),
StandardOpenOption.READ)) {
+ return channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size());
+ }
+ }
+
+ private static void validateTrailer(ByteBuffer buf) {
+ int trailerStart = Math.max(0, buf.limit() - 4096);
+ buf.position(trailerStart);
+
+ // Verify magic
+ byte[] trailerMagic = new byte[8];
+ buf.get(trailerMagic);
+ assertArrayEquals(TRAILER.getMagic(), trailerMagic);
+
+ // Verify version (last 4 bytes of trailer)
+ buf.position(trailerStart + 4096 - 4);
+ byte[] versionBytes = new byte[4];
+ buf.get(versionBytes);
+ int version = ByteBuffer.wrap(versionBytes).getInt();
+ assertEquals(3, version);
+ }
+
+ private static void validateDataBlocks(ByteBuffer buf) {
+ // Point to the first data block.
+ buf.position(0);
+
+ // Validate magic.
+ byte[] dataBlockMagic = new byte[8];
+ buf.get(dataBlockMagic);
+ assertArrayEquals(DATA.getMagic(), dataBlockMagic);
+
+ // Skip header.
+ buf.position(buf.position() + 25);
+
+ // Validate data.
+ validateKeyValue(buf, "key1", "value1");
+ validateKeyValue(buf, "key2", "value2");
+ validateKeyValue(buf, "key3", "value3");
+ }
+
+ private static void validateKeyValue(ByteBuffer buf, String expectedKey,
String expectedValue) {
+ int keyLen = buf.getInt();
+ int valLen = buf.getInt();
+
+ byte[] key = new byte[keyLen];
+ buf.get(key);
+ byte[] keyContent = Arrays.copyOfRange(key, 2, key.length);
+ assertArrayEquals(expectedKey.getBytes(StandardCharsets.UTF_8),
keyContent);
+
+ byte[] value = new byte[valLen];
+ buf.get(value);
+ assertArrayEquals(expectedValue.getBytes(StandardCharsets.UTF_8), value);
+
+ buf.get(); // Skip MVCC timestamp
+ }
+
+ private static void assertArrayEquals(byte[] expected, byte[] actual) {
+ if (!Arrays.equals(expected, actual)) {
+ throw new AssertionError("Byte array mismatch");
+ }
+ }
+}