yihua commented on code in PR #12866:
URL: https://github.com/apache/hudi/pull/12866#discussion_r2106111181
##########
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressorFactory.java:
##########
@@ -19,19 +19,22 @@
package org.apache.hudi.io.compress;
-import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipDecompressor;
-import org.apache.hudi.io.compress.builtin.HoodieNoneDecompressor;
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipCompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneCompressor;
/**
- * Factory for {@link HoodieDecompressor}.
+ * Factory for {@link HoodieCompressor}.
*/
-public class HoodieDecompressorFactory {
- public static HoodieDecompressor getDecompressor(CompressionCodec
compressionCodec) {
+public class HoodieCompressorFactory {
+ private HoodieCompressorFactory() {
+ }
+
+ public static HoodieCompressor getCompressor(CompressionCodec
compressionCodec) {
switch (compressionCodec) {
case NONE:
- return new HoodieNoneDecompressor();
+ return new HoodieNoneCompressor();
case GZIP:
- return new HoodieAirliftGzipDecompressor();
+ return new HoodieAirliftGzipCompressor();
Review Comment:
Update exception message below
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java:
##########
@@ -187,34 +159,244 @@ public int getOnDiskSizeWithHeader() {
* @throws IOException upon decoding and decompression error.
*/
public void unpack() throws IOException {
- if (!isUnpacked) {
+ if (!readAttributesOpt.get().isUnpacked) {
// Should only be called for compressed blocks
CompressionCodec compression = context.getCompressionCodec();
if (compression != CompressionCodec.NONE) {
// Copy the block header which is not compressed
System.arraycopy(
- compressedByteBuff, startOffsetInCompressedBuff, byteBuff, 0,
HFILEBLOCK_HEADER_SIZE);
+ readAttributesOpt.get().compressedByteBuff,
+ readAttributesOpt.get().startOffsetInCompressedBuff,
+ readAttributesOpt.get().byteBuff,
+ 0,
+ HFILEBLOCK_HEADER_SIZE);
try (InputStream byteBuffInputStream = new ByteArrayInputStream(
- compressedByteBuff, startOffsetInCompressedBuff +
HFILEBLOCK_HEADER_SIZE, onDiskSizeWithoutHeader)) {
- context.getDecompressor().decompress(
+ readAttributesOpt.get().compressedByteBuff,
+ readAttributesOpt.get().startOffsetInCompressedBuff +
HFILEBLOCK_HEADER_SIZE,
+ readAttributesOpt.get().onDiskSizeWithoutHeader)) {
+ context.getCompressor().decompress(
byteBuffInputStream,
- byteBuff,
+ readAttributesOpt.get().byteBuff,
HFILEBLOCK_HEADER_SIZE,
- byteBuff.length - HFILEBLOCK_HEADER_SIZE);
+ readAttributesOpt.get().byteBuff.length -
HFILEBLOCK_HEADER_SIZE);
}
}
- isUnpacked = true;
+ readAttributesOpt.get().isUnpacked = true;
}
}
+ // ================ Below are for Write ================
+
/**
- * Allocates new byte buffer for the uncompressed bytes.
- *
- * @return a new byte array based on the size of uncompressed data, holding
the same header
- * bytes.
+ * Returns serialized "data" part of the block.
+ * This function must be implemented by each block type separately.
+ */
+ protected abstract ByteBuffer getUncompressedBlockDataToWrite();
+
+ /**
+ * Return serialized block including header, data, checksum.
+ */
+ public ByteBuffer serialize() throws IOException {
+ // Block payload.
+ ByteBuffer uncompressedBlockData = getUncompressedBlockDataToWrite();
+ // Compress if specified.
+ ByteBuffer compressedBlockData = compress(uncompressedBlockData);
+ // Buffer for building block.
+ ByteBuffer buf = ByteBuffer.allocate(Math.max(
+ context.getBlockSize() * 2,
+ compressedBlockData.limit() + HFILEBLOCK_HEADER_SIZE * 2));
+
+ // Block header
+ // 1. Magic is always 8 bytes.
+ buf.put(blockType.getMagic(), 0, 8);
+ // 2. onDiskSizeWithoutHeader.
+ buf.putInt(compressedBlockData.limit());
+ // 3. uncompressedSizeWithoutHeader.
+ buf.putInt(uncompressedBlockData.limit());
+ // 4. Previous block offset.
+ buf.putLong(previousBlockOffset);
+ // 5. Checksum type.
+ buf.put(context.getChecksumType().getCode());
+ // 6. Bytes covered per checksum.
+ buf.putInt(DEFAULT_BYTES_PER_CHECKSUM);
+ // 7. onDiskDataSizeWithHeader
+ int onDiskDataSizeWithHeader =
+ HFileBlock.HFILEBLOCK_HEADER_SIZE + uncompressedBlockData.limit();
+ buf.putInt(onDiskDataSizeWithHeader);
+ // 8. Payload.
+ buf.put(compressedBlockData);
+ // 9. Checksum.
+ buf.put(generateChecksumBytes(context.getChecksumType()));
+
+ // Update sizes
+ buf.flip();
+ return buf;
+ }
+
+ /**
+ * Compress block data without header and checksum.
+ */
+ protected ByteBuffer compress(ByteBuffer payload) throws IOException {
Review Comment:
This can be inlined
##########
hudi-io/src/main/java/org/apache/hudi/io/compress/CompressionCodec.java:
##########
@@ -19,26 +19,61 @@
package org.apache.hudi.io.compress;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
/**
* Available compression codecs.
* There should not be any assumption on the ordering or ordinal of the
defined enums.
*/
public enum CompressionCodec {
- NONE("none"),
- BZIP2("bz2"),
- GZIP("gz"),
- LZ4("lz4"),
- LZO("lzo"),
- SNAPPY("snappy"),
- ZSTD("zstd");
+ NONE("none", 2),
Review Comment:
Storing the code along inside the enum is good. We should update
`createCompressionCodecMap` to avoid hardcoded code now.
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java:
##########
@@ -60,7 +60,6 @@ protected HoodieAvroHFileReaderImplBase
createHFileReader(HoodieStorage storage,
protected void verifyHFileReader(byte[] content,
String hfileName,
boolean mayUseDefaultComparator,
- Class<?> expectedComparatorClazz,
Review Comment:
We need to keep this validation for backwards compatibility.
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java:
##########
@@ -157,72 +167,60 @@ private boolean isAtFirstKey(int relativeOffset) {
}
// ================ Below are for Write ================
- protected final List<KeyValueEntry> entries = new ArrayList<>();
+ private final List<KeyValueEntry> entries = new ArrayList<>();
- public HFileDataBlock(HFileContext context) {
- this(context,-1L);
+ static HFileDataBlock createWritableDataBlock(HFileContext context,
+ long previousBlockOffset) {
+ return new HFileDataBlock(context, previousBlockOffset);
}
- public HFileDataBlock(HFileContext context, long previousBlockOffset) {
- super(context, HFileBlockType.DATA, previousBlockOffset);
- // This is not used for write.
- uncompressedContentEndRelativeOffset = -1;
- }
-
- public List<KeyValueEntry> getEntries() {
- return entries;
- }
-
- public boolean isEmpty() {
+ boolean isEmpty() {
return entries.isEmpty();
}
- public void add(byte[] key, byte[] value) {
+ void add(byte[] key, byte[] value) {
KeyValueEntry kv = new KeyValueEntry(key, value);
// Assume all entries are sorted before write.
- add(kv, false);
+ entries.add(kv);
}
- public int getNumOfEntries() {
+ int getNumOfEntries() {
return entries.size();
}
- protected void add(KeyValueEntry kv, boolean sorted) {
- entries.add(kv);
- if (sorted) {
- entries.sort(KeyValueEntry::compareTo);
- }
- }
-
- public byte[] getFirstKey() {
+ byte[] getFirstKey() {
return entries.get(0).key;
}
- public byte[] getLastKeyContent() {
+ byte[] getLastKeyContent() {
if (entries.isEmpty()) {
return new byte[0];
}
return entries.get(entries.size() - 1).key;
}
@Override
- public ByteBuffer getPayload() {
- ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize() * 2);
+ protected ByteBuffer getUncompressedBlockDataToWrite() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ByteBuffer dataBuf = ByteBuffer.allocate(context.getBlockSize());
for (KeyValueEntry kv : entries) {
- // Length of key + length of a short variable indicating length of key;
+ // Length of key + length of a short variable indicating length of key.
dataBuf.putInt(kv.key.length + SIZEOF_INT16);
- // Length of value;
+ // Length of value.
dataBuf.putInt(kv.value.length);
- // Key content length;
+ // Key content length.
dataBuf.putShort((short)kv.key.length);
// Key.
dataBuf.put(kv.key);
// Value.
dataBuf.put(kv.value);
// MVCC.
dataBuf.put((byte)0);
+ // Copy to output stream.
+ baos.write(dataBuf.array(), 0, dataBuf.position());
+ // Clear the buffer.
+ dataBuf.clear();
}
- dataBuf.flip();
- return dataBuf;
+ return ByteBuffer.wrap(baos.toByteArray());
Review Comment:
Revisit to see if we should use `ByteBuffer` only
##########
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java:
##########
@@ -74,20 +74,12 @@ protected HoodieAvroHFileReaderImplBase
createHFileReader(HoodieStorage storage,
protected void verifyHFileReader(byte[] content,
String hfileName,
boolean mayUseDefaultComparator,
- Class<?> expectedComparatorClazz,
int count) throws IOException {
HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
try (HFile.Reader reader =
HoodieHFileUtils.createHFileReader(storage, new
StoragePath(DUMMY_BASE_PATH), content)) {
// HFile version is 3
assertEquals(3, reader.getTrailer().getMajorVersion());
- if (mayUseDefaultComparator && hfileName.contains("hudi_0_9")) {
- // Pre Hudi 0.10, the default comparator is used for metadata table
HFiles
- // For bootstrap index HFiles, the custom comparator is always used
- assertEquals(CellComparatorImpl.class,
reader.getComparator().getClass());
- } else {
- assertEquals(expectedComparatorClazz,
reader.getComparator().getClass());
- }
Review Comment:
Let's revert the changes as the tests are disabled anyway. However, I would
follow up to understand why the tests cannot pass.
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java:
##########
@@ -135,18 +151,76 @@ public KeyValue readKeyValue(int offset) {
* HFile.
* @return {@code true} if there is next {@link KeyValue}; {code false}
otherwise.
*/
- public boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
+ boolean next(HFileCursor cursor, int blockStartOffsetInFile) {
int offset = cursor.getOffset() - blockStartOffsetInFile;
Option<KeyValue> keyValue = cursor.getKeyValue();
if (!keyValue.isPresent()) {
keyValue = Option.of(readKeyValue(offset));
}
cursor.increment((long) KEY_OFFSET + (long) keyValue.get().getKeyLength()
- + (long) keyValue.get().getValueLength() +
ZERO_TS_VERSION_BYTE_LENGTH);
+ + keyValue.get().getValueLength() + ZERO_TS_VERSION_BYTE_LENGTH);
Review Comment:
revert unnecessary change
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlock.java:
##########
@@ -68,43 +75,37 @@ static class Header {
}
protected final HFileContext context;
- protected final byte[] byteBuff;
- protected final int startOffsetInBuff;
- protected final int sizeCheckSum;
- protected final int uncompressedEndOffset;
+ protected final Option<HFileBlockReadAttributes> readAttributesOpt;
private final HFileBlockType blockType;
- protected final int onDiskSizeWithoutHeader;
- protected final int uncompressedSizeWithoutHeader;
- protected final int bytesPerChecksum;
- private boolean isUnpacked = false;
- protected byte[] compressedByteBuff;
- protected int startOffsetInCompressedBuff;
+ // Write properties
+ protected long startOffsetInBuff = -1;
+ protected long previousBlockOffset = -1;
+ protected int blockSize;
Review Comment:
These can be `private`
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileWriterImpl.java:
##########
@@ -137,16 +138,16 @@ private void flushCurrentDataBlock() throws IOException {
rootIndexBlock.add(
currentDataBlock.getFirstKey(), lastDataBlockOffset,
blockBuffer.limit());
// 4. Create a new data block.
- currentDataBlock = new HFileDataBlock(context, currentOffset);
+ currentDataBlock = HFileDataBlock.createWritableDataBlock(context,
currentOffset);
}
// NOTE that: reader assumes that every meta info piece
// should be a separate meta block.
private void flushMetaBlocks() throws IOException {
for (Map.Entry<String, byte[]> e : metaInfo.entrySet()) {
- HFileMetaBlock currentMetaBlock = new HFileMetaBlock(context);
byte[] key = StringUtils.getUTF8Bytes(e.getKey());
- currentMetaBlock.add(key, e.getValue());
+ HFileMetaBlock currentMetaBlock =
Review Comment:
nit: `byte[] key = StringUtils.getUTF8Bytes(e.getKey())` can be inlined
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]