jonvex commented on code in PR #11210:
URL: https://github.com/apache/hudi/pull/11210#discussion_r1599326562
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -128,4 +165,89 @@ public HoodieFileFormat getFormat() {
public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
throw new UnsupportedOperationException("HFileUtils does not support
writeMetaFile");
}
+
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ Compression.Algorithm compressionAlgorithm =
getHFileCompressionAlgorithm(paramsMap);
+ HFileContext context = new HFileContextBuilder()
+ .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .withCompression(compressionAlgorithm)
+ .withCellComparator(new HoodieHBaseKVComparator())
+ .build();
+
+ Configuration conf = storageConf.unwrapAs(Configuration.class);
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+ // Use simple incrementing counter as a key
+ boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
+ // This is set here to avoid re-computing this in the loop
+ int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
+
+ // Serialize records into bytes
+ Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+ Iterator<HoodieRecord> itr = records.iterator();
+ int id = 0;
+ while (itr.hasNext()) {
+ HoodieRecord<?> record = itr.next();
+ String recordKey;
+ if (useIntegerKey) {
+ recordKey = String.format("%" + keyWidth + "s", id++);
+ } else {
+ recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+ }
+
+ final byte[] recordBytes = serializeRecord(record, writerSchema,
keyFieldName);
+ // If key exists in the map, append to its list. If not, create a new
list.
+ // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
+ List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
+ recordBytesList.add(recordBytes);
+ // Put the updated list back into the map
+ sortedRecordsMap.put(recordKey, recordBytesList);
+ }
+
+ HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+ .withOutputStream(ostream).withFileContext(context).create();
+
+ // Write the records
+ sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+ for (byte[] recordBytes : recordBytesList) {
+ try {
+ KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
+ writer.append(kv);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ }
+ });
+
+ writer.appendFileInfo(
+ getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(readerSchema.toString()));
+
+ writer.close();
+ ostream.flush();
Review Comment:
Wouldn't we want to flush before closing the writer?
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -35,21 +39,54 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
+
+import static
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.HFILE_COMPRESSION_ALGO_PARAM_KEY;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
* Utility functions for HFile files.
*/
-public class HFileUtils extends BaseFileUtils {
-
+public class HFileUtils extends FileFormatUtils {
private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
+ private static final int DEFAULT_BLOCK_SIZE_FOR_LOG_FILE = 1024 * 1024;
+
+ /**
+ * Gets the {@link Compression.Algorithm} Enum based on the {@link
CompressionCodec} name.
+ *
+ * @param paramsMap parameter map containing the compression codec config.
+ * @return the {@link Compression.Algorithm} Enum.
+ */
+ public static Compression.Algorithm getHFileCompressionAlgorithm(Map<String,
String> paramsMap) {
+ String algoName = paramsMap.get(HFILE_COMPRESSION_ALGO_PARAM_KEY);
+ if (algoName == null) {
Review Comment:
probably should also check for empty string
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -128,4 +165,89 @@ public HoodieFileFormat getFormat() {
public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
throw new UnsupportedOperationException("HFileUtils does not support
writeMetaFile");
}
+
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ Compression.Algorithm compressionAlgorithm =
getHFileCompressionAlgorithm(paramsMap);
+ HFileContext context = new HFileContextBuilder()
+ .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .withCompression(compressionAlgorithm)
+ .withCellComparator(new HoodieHBaseKVComparator())
+ .build();
+
+ Configuration conf = storageConf.unwrapAs(Configuration.class);
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+ // Use simple incrementing counter as a key
+ boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
+ // This is set here to avoid re-computing this in the loop
+ int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
+
+ // Serialize records into bytes
+ Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+ Iterator<HoodieRecord> itr = records.iterator();
+ int id = 0;
+ while (itr.hasNext()) {
+ HoodieRecord<?> record = itr.next();
+ String recordKey;
+ if (useIntegerKey) {
+ recordKey = String.format("%" + keyWidth + "s", id++);
+ } else {
+ recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+ }
+
+ final byte[] recordBytes = serializeRecord(record, writerSchema,
keyFieldName);
+ // If key exists in the map, append to its list. If not, create a new
list.
+ // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
+ List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
+ recordBytesList.add(recordBytes);
+ // Put the updated list back into the map
+ sortedRecordsMap.put(recordKey, recordBytesList);
+ }
+
+ HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+ .withOutputStream(ostream).withFileContext(context).create();
+
+ // Write the records
+ sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+ for (byte[] recordBytes : recordBytesList) {
+ try {
+ KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
+ writer.append(kv);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ }
+ });
+
+ writer.appendFileInfo(
+ getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(readerSchema.toString()));
+
+ writer.close();
+ ostream.flush();
+ ostream.close();
+
+ return baos.toByteArray();
+ }
+
+ private Option<String> getRecordKey(HoodieRecord record, Schema
readerSchema, String keyFieldName) {
Review Comment:
static
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java:
##########
@@ -366,6 +382,35 @@ public void writeMetaFile(HoodieStorage storage,
}
}
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ if (records.size() == 0) {
+ return new byte[0];
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ HoodieConfig config = new HoodieConfig();
+ paramsMap.entrySet().stream().forEach(entry ->
config.setValue(entry.getKey(), entry.getValue()));
+ config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+ config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+ config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
Review Comment:
Should this value be a static var or something?
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java:
##########
@@ -207,7 +208,7 @@ private static HoodieDataBlock createDataBlock(
false,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
- CompressionCodecName.GZIP,
+ "gzip",
Review Comment:
No way to put this name into the enum?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java:
##########
@@ -74,11 +61,10 @@
* base file format.
*/
public class HoodieHFileDataBlock extends HoodieDataBlock {
+ public static final String HFILE_COMPRESSION_ALGO_PARAM_KEY =
"hfile_compression_algo";
Review Comment:
Why don't we have a hoodie config for this like we have with parquet? I
would rather just make the compression enum one of the params to
serializeRecordsToLogBlock rather than it being inside the map
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -128,4 +165,89 @@ public HoodieFileFormat getFormat() {
public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
throw new UnsupportedOperationException("HFileUtils does not support
writeMetaFile");
}
+
+ @Override
+ public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+ List<HoodieRecord> records,
+ Schema writerSchema,
+ Schema readerSchema,
+ String keyFieldName,
+ Map<String, String> paramsMap)
throws IOException {
+ Compression.Algorithm compressionAlgorithm =
getHFileCompressionAlgorithm(paramsMap);
+ HFileContext context = new HFileContextBuilder()
+ .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .withCompression(compressionAlgorithm)
+ .withCellComparator(new HoodieHBaseKVComparator())
+ .build();
+
+ Configuration conf = storageConf.unwrapAs(Configuration.class);
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+ // Use simple incrementing counter as a key
+ boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
+ // This is set here to avoid re-computing this in the loop
+ int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
+
+ // Serialize records into bytes
+ Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+ Iterator<HoodieRecord> itr = records.iterator();
+ int id = 0;
+ while (itr.hasNext()) {
+ HoodieRecord<?> record = itr.next();
+ String recordKey;
+ if (useIntegerKey) {
+ recordKey = String.format("%" + keyWidth + "s", id++);
+ } else {
+ recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+ }
+
+ final byte[] recordBytes = serializeRecord(record, writerSchema,
keyFieldName);
+ // If key exists in the map, append to its list. If not, create a new
list.
+ // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
+ List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
+ recordBytesList.add(recordBytes);
+ // Put the updated list back into the map
+ sortedRecordsMap.put(recordKey, recordBytesList);
+ }
+
+ HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+ .withOutputStream(ostream).withFileContext(context).create();
+
+ // Write the records
+ sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+ for (byte[] recordBytes : recordBytesList) {
+ try {
+ KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
+ writer.append(kv);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ }
+ });
+
+ writer.appendFileInfo(
+ getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY),
getUTF8Bytes(readerSchema.toString()));
+
+ writer.close();
+ ostream.flush();
+ ostream.close();
+
+ return baos.toByteArray();
+ }
+
+ private Option<String> getRecordKey(HoodieRecord record, Schema
readerSchema, String keyFieldName) {
+ return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
+ }
+
+ private byte[] serializeRecord(HoodieRecord<?> record, Schema schema, String
keyFieldName) throws IOException {
Review Comment:
static as well I think
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java:
##########
@@ -99,29 +90,17 @@ public HoodieLogBlockType getBlockType() {
@Override
protected byte[] serializeRecords(List<HoodieRecord> records,
StorageConfiguration<?> storageConf) throws IOException {
- if (records.size() == 0) {
- return new byte[0];
- }
-
- Schema writerSchema = new
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- HoodieConfig config = new HoodieConfig();
- config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get().name());
- config.setValue(PARQUET_BLOCK_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
- config.setValue(PARQUET_PAGE_SIZE.key(),
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
- config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 *
1024));
- config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
- config.setValue(PARQUET_DICTIONARY_ENABLED,
String.valueOf(useDictionaryEncoding.get()));
- HoodieRecordType recordType = records.iterator().next().getRecordType();
- try (HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(
- HoodieFileFormat.PARQUET, outputStream, storageConf, config,
writerSchema, recordType)) {
- for (HoodieRecord<?> record : records) {
- String recordKey = getRecordKey(record).orElse(null);
- parquetWriter.write(recordKey, record, writerSchema);
- }
- outputStream.flush();
- }
- return outputStream.toByteArray();
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(),
compressionCodecName.get());
+ paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(),
String.valueOf(expectedCompressionRatio.get()));
+ paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(),
String.valueOf(useDictionaryEncoding.get()));
+
+ return FileFormatUtils.getInstance(PARQUET).serializeRecordsToLogBlock(
+ storageConf, records,
+ new
Schema.Parser().parse(super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA)),
Review Comment:
maybe set a variable to this above, so then we can have all the method
params in 1 line
##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java:
##########
@@ -207,7 +208,7 @@ private static HoodieDataBlock createDataBlock(
false,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
- CompressionCodecName.GZIP,
+ "gzip",
Review Comment:
Compression.Algorithm.GZ.name? or something?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]