nsivabalan commented on code in PR #14078:
URL: https://github.com/apache/hudi/pull/14078#discussion_r2427160981
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java:
##########
@@ -191,58 +190,51 @@ public ByteArrayOutputStream
serializeRecordsToLogBlock(HoodieStorage storage,
Map<String, String>
paramsMap) throws IOException {
CompressionCodec compressionCodec =
getHFileCompressionAlgorithm(paramsMap);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- OutputStream ostream = new DataOutputStream(baos);
-
- // Use simple incrementing counter as a key
- boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
- // This is set here to avoid re-computing this in the loop
- int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
-
- // Serialize records into bytes
- Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
-
- Iterator<HoodieRecord> itr = records.iterator();
- int id = 0;
- Option<Schema.Field> keyField =
Option.ofNullable(writerSchema.getField(keyFieldName));
- while (itr.hasNext()) {
- HoodieRecord<?> record = itr.next();
- String recordKey;
- if (useIntegerKey) {
- recordKey = String.format("%" + keyWidth + "s", id++);
- } else {
- recordKey = getRecordKey(record, readerSchema, keyFieldName).get();
- }
+ try (OutputStream ostream = new DataOutputStream(baos)) {
+ HFileContext context = HFileContext.builder()
+ .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
+ .compressionCodec(compressionCodec)
+ .build();
+ // Use simple incrementing counter as a key
+ boolean useIntegerKey = !getRecordKey(records.get(0), readerSchema,
keyFieldName).isPresent();
+ // This is set here to avoid re-computing this in the loop
+ int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size()))
+ 1 : -1;
+ int id = 0;
+ Option<Schema.Field> keyField =
Option.ofNullable(writerSchema.getField(keyFieldName));
+ try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
+ String previousRecordKey = null;
+ // It is assumed that the input records are sorted based on the record
key
+ // for HFile block
+ for (int i = 0; i < records.size(); i++) {
+ HoodieRecord<?> record = records.get(i);
+ String recordKey = useIntegerKey
+ ? String.format("%" + keyWidth + "s", id++)
+ : getRecordKey(record, readerSchema, keyFieldName).get();
+ final byte[] recordBytes = serializeRecord(record, writerSchema,
keyField);
+ // Since the list is sorted, duplicates will be adjacent.
+ if (i > 0 && recordKey.equals(previousRecordKey)) {
+ LOG.error("Found duplicate record with recordKey: {}", recordKey);
+ logRecordMetadata("Previous record",
+ serializeRecord(records.get(i - 1), writerSchema, keyField),
writerSchema);
+ logRecordMetadata("Current record",
+ serializeRecord(record, writerSchema, keyField), writerSchema);
+ throw new HoodieException(String.format(
+ "Writing multiple records with same key %s not supported for
Hfile format with Metadata table", recordKey));
+ }
+ try {
+ writer.append(recordKey, recordBytes);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
+ }
+ previousRecordKey = recordKey;
+ }
- final byte[] recordBytes = serializeRecord(record, writerSchema,
keyField);
- if (sortedRecordsMap.containsKey(recordKey)) {
- LOG.error("Found duplicate record with recordKey: {} ", recordKey);
- logRecordMetadata("Previous record", sortedRecordsMap.get(recordKey),
writerSchema);
- logRecordMetadata("Current record", recordBytes, writerSchema);
- throw new HoodieException(String.format("Writing multiple records with
same key %s not supported for Hfile format with Metadata table",
- recordKey));
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
+ getUTF8Bytes(readerSchema.toString()));
}
- sortedRecordsMap.put(recordKey, recordBytes);
- }
-
- HFileContext context = HFileContext.builder()
- .blockSize(DEFAULT_BLOCK_SIZE_FOR_LOG_FILE)
- .compressionCodec(compressionCodec)
- .build();
- try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
- sortedRecordsMap.forEach((recordKey,recordBytes) -> {
- try {
- writer.append(recordKey, recordBytes);
- } catch (IOException e) {
- throw new HoodieIOException("IOException serializing records", e);
- }
- });
- writer.appendFileInfo(
- HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
- getUTF8Bytes(readerSchema.toString()));
}
- ostream.flush();
Review Comment:
should we do try w/ explicit finally and then do `osTream.flush() and
ostream.close()` within `finally` block. currently looks like we are not
calling `flush` anywhere.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]