yihua commented on code in PR #11210:
URL: https://github.com/apache/hudi/pull/11210#discussion_r1599346193


##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -128,4 +165,89 @@ public HoodieFileFormat getFormat() {
   public void writeMetaFile(HoodieStorage storage, StoragePath filePath, 
Properties props) throws IOException {
     throw new UnsupportedOperationException("HFileUtils does not support 
writeMetaFile");
   }
+
+  @Override
+  public byte[] serializeRecordsToLogBlock(StorageConfiguration<?> storageConf,
+                                           List<HoodieRecord> records,
+                                           Schema writerSchema,
+                                           Schema readerSchema,
+                                           String keyFieldName,
+                                           Map<String, String> paramsMap) 
throws IOException {
+    Compression.Algorithm compressionAlgorithm = 
getHFileCompressionAlgorithm(paramsMap);
+    HFileContext context = new HFileContextBuilder()
+        .withBlockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+        .withCompression(compressionAlgorithm)
+        .withCellComparator(new HoodieHBaseKVComparator())
+        .build();
+
+    Configuration conf = storageConf.unwrapAs(Configuration.class);
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    // Use simple incrementing counter as a key
+    boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema, 
keyFieldName).isPresent();
+    // This is set here to avoid re-computing this in the loop
+    int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) + 
1 : -1;
+
+    // Serialize records into bytes
+    Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+
+    Iterator<HoodieRecord> itr = records.iterator();
+    int id = 0;
+    while (itr.hasNext()) {
+      HoodieRecord<?> record = itr.next();
+      String recordKey;
+      if (useIntegerKey) {
+        recordKey = String.format("%" + keyWidth + "s", id++);
+      } else {
+        recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
+      }
+
+      final byte[] recordBytes = serializeRecord(record, writerSchema, 
keyFieldName);
+      // If key exists in the map, append to its list. If not, create a new 
list.
+      // Get the existing list of recordBytes for the recordKey, or an empty 
list if it doesn't exist
+      List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey, 
new ArrayList<>());
+      recordBytesList.add(recordBytes);
+      // Put the updated list back into the map
+      sortedRecordsMap.put(recordKey, recordBytesList);
+    }
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Write the records
+    sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
+      for (byte[] recordBytes : recordBytesList) {
+        try {
+          KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, 
recordBytes);
+          writer.append(kv);
+        } catch (IOException e) {
+          throw new HoodieIOException("IOException serializing records", e);
+        }
+      }
+    });
+
+    writer.appendFileInfo(
+        getUTF8Bytes(HoodieAvroHFileReaderImplBase.SCHEMA_KEY), 
getUTF8Bytes(readerSchema.toString()));
+
+    writer.close();
+    ostream.flush();
+    ostream.close();
+
+    return baos.toByteArray();
+  }
+
+  private Option<String> getRecordKey(HoodieRecord record, Schema 
readerSchema, String keyFieldName) {
+    return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
+  }
+
+  private byte[] serializeRecord(HoodieRecord<?> record, Schema schema, String 
keyFieldName) throws IOException {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to