vinothchandar commented on code in PR #17979:
URL: https://github.com/apache/hudi/pull/17979#discussion_r2718709762
##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object>
serializeRecordsToLogBlock(HoodieStor
HoodieSchema readerSchema,
String
keyFieldName,
Map<String, String> paramsMap) throws IOException {
- throw new UnsupportedOperationException("serializeRecordsToLogBlock with
iterator is not yet supported for Lance format");
+ return Pair.of(getByteArrayOutputStream(storage, records, writerSchema,
readerSchema, keyFieldName, paramsMap, recordType), null);
+ }
+
+ private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+ HoodieConfig config = new HoodieConfig();
+ paramsMap.forEach(config::setValue);
+ return config;
+ }
+
+ private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema,
HoodieSchema readerSchema, String keyFieldName,
+ Map<String,
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException
{
+ HoodieConfig config = getHoodieConfig(paramsMap);
+
+ File tempFile = File.createTempFile("lance-log-block-" +
UUID.randomUUID(), ".lance");
Review Comment:
lets do one of these to ensure it gets deleted on jvm exit. and may be use
the newer NIO `Files` helper.?
> Automatic Deletion: Temporary files often need to be cleaned up. You can
use the deleteOnExit() method on the resulting File object to request that the
file be deleted when the Java Virtual Machine terminates. For NIO, you can open
the file using the DELETE_ON_CLOSE option to have it deleted when the file
channel or stream is closed.
Can we pick up the `.lance` string from some enum that has file extensions
instead of hard coding?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object>
serializeRecordsToLogBlock(HoodieStor
HoodieSchema readerSchema,
String
keyFieldName,
Map<String, String> paramsMap) throws IOException {
- throw new UnsupportedOperationException("serializeRecordsToLogBlock with
iterator is not yet supported for Lance format");
+ return Pair.of(getByteArrayOutputStream(storage, records, writerSchema,
readerSchema, keyFieldName, paramsMap, recordType), null);
+ }
+
+ private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+ HoodieConfig config = new HoodieConfig();
+ paramsMap.forEach(config::setValue);
+ return config;
+ }
+
+ private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema,
HoodieSchema readerSchema, String keyFieldName,
+ Map<String,
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException
{
+ HoodieConfig config = getHoodieConfig(paramsMap);
+
+ File tempFile = File.createTempFile("lance-log-block-" +
UUID.randomUUID(), ".lance");
+ StoragePath tempFilePath = new StoragePath(tempFile.toURI());
+
+ Object fileFormatMetadata;
+ try (HoodieFileWriter lanceWriter =
HoodieFileWriterFactory.getFileWriter(null, tempFilePath, storage, config,
writerSchema, new LocalTaskContextSupplier(), recordType)) {
+ while (records.hasNext()) {
+ HoodieRecord record = records.next();
+ String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+ lanceWriter.write(recordKey, record, writerSchema);
+ }
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[COPY_BUFFER_SIZE];
+ try (FileInputStream fis = new FileInputStream(tempFile)) {
+ int bytesRead;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ outputStream.write(buffer, 0, bytesRead);
+ }
+ outputStream.flush();
+ }
+ tempFile.delete();
Review Comment:
deleteOnExit helps if for e.g the jvm OOMs and crashes before this line.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object>
serializeRecordsToLogBlock(HoodieStor
HoodieSchema readerSchema,
String
keyFieldName,
Map<String, String> paramsMap) throws IOException {
- throw new UnsupportedOperationException("serializeRecordsToLogBlock with
iterator is not yet supported for Lance format");
+ return Pair.of(getByteArrayOutputStream(storage, records, writerSchema,
readerSchema, keyFieldName, paramsMap, recordType), null);
+ }
+
+ private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+ HoodieConfig config = new HoodieConfig();
+ paramsMap.forEach(config::setValue);
+ return config;
+ }
+
+ private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema,
HoodieSchema readerSchema, String keyFieldName,
+ Map<String,
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException
{
+ HoodieConfig config = getHoodieConfig(paramsMap);
+
+ File tempFile = File.createTempFile("lance-log-block-" +
UUID.randomUUID(), ".lance");
+ StoragePath tempFilePath = new StoragePath(tempFile.toURI());
+
+ Object fileFormatMetadata;
+ try (HoodieFileWriter lanceWriter =
HoodieFileWriterFactory.getFileWriter(null, tempFilePath, storage, config,
writerSchema, new LocalTaskContextSupplier(), recordType)) {
+ while (records.hasNext()) {
+ HoodieRecord record = records.next();
+ String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+ lanceWriter.write(recordKey, record, writerSchema);
+ }
+ }
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ byte[] buffer = new byte[COPY_BUFFER_SIZE];
+ try (FileInputStream fis = new FileInputStream(tempFile)) {
Review Comment:
anything already in IOUtils to do this without writing code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]