vinothchandar commented on code in PR #17979:
URL: https://github.com/apache/hudi/pull/17979#discussion_r2718709762


##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object> 
serializeRecordsToLogBlock(HoodieStor
                                                                         
HoodieSchema readerSchema,
                                                                         String 
keyFieldName,
                                                                         
Map<String, String> paramsMap) throws IOException {
-    throw new UnsupportedOperationException("serializeRecordsToLogBlock with 
iterator is not yet supported for Lance format");
+    return Pair.of(getByteArrayOutputStream(storage, records, writerSchema, 
readerSchema, keyFieldName, paramsMap, recordType), null);
+  }
+
+  private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+    HoodieConfig config = new HoodieConfig();
+    paramsMap.forEach(config::setValue);
+    return config;
+  }
+
+  private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage 
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema, 
HoodieSchema readerSchema, String keyFieldName,
+                                                                Map<String, 
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException 
{
+    HoodieConfig config = getHoodieConfig(paramsMap);
+
+    File tempFile = File.createTempFile("lance-log-block-" + 
UUID.randomUUID(), ".lance");

Review Comment:
   lets do one of these to ensure it gets deleted on jvm exit. and may be use 
the newer NIO `Files` helper.? 
   
   > Automatic Deletion: Temporary files often need to be cleaned up. You can 
use the deleteOnExit() method on the resulting File object to request that the 
file be deleted when the Java Virtual Machine terminates. For NIO, you can open 
the file using the DELETE_ON_CLOSE option to have it deleted when the file 
channel or stream is closed. 
   
   Can we pick up the `.lance` string from some enum that has file extensions 
instead of hard coding?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object> 
serializeRecordsToLogBlock(HoodieStor
                                                                         
HoodieSchema readerSchema,
                                                                         String 
keyFieldName,
                                                                         
Map<String, String> paramsMap) throws IOException {
-    throw new UnsupportedOperationException("serializeRecordsToLogBlock with 
iterator is not yet supported for Lance format");
+    return Pair.of(getByteArrayOutputStream(storage, records, writerSchema, 
readerSchema, keyFieldName, paramsMap, recordType), null);
+  }
+
+  private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+    HoodieConfig config = new HoodieConfig();
+    paramsMap.forEach(config::setValue);
+    return config;
+  }
+
+  private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage 
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema, 
HoodieSchema readerSchema, String keyFieldName,
+                                                                Map<String, 
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException 
{
+    HoodieConfig config = getHoodieConfig(paramsMap);
+
+    File tempFile = File.createTempFile("lance-log-block-" + 
UUID.randomUUID(), ".lance");
+    StoragePath tempFilePath = new StoragePath(tempFile.toURI());
+
+    Object fileFormatMetadata;
+    try (HoodieFileWriter lanceWriter = 
HoodieFileWriterFactory.getFileWriter(null, tempFilePath, storage, config, 
writerSchema, new LocalTaskContextSupplier(), recordType)) {
+      while (records.hasNext()) {
+        HoodieRecord record = records.next();
+        String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+        lanceWriter.write(recordKey, record, writerSchema);
+      }
+    }
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    byte[] buffer = new byte[COPY_BUFFER_SIZE];
+    try (FileInputStream fis = new FileInputStream(tempFile)) {
+      int bytesRead;
+      while ((bytesRead = fis.read(buffer)) != -1) {
+        outputStream.write(buffer, 0, bytesRead);
+      }
+      outputStream.flush();
+    }
+    tempFile.delete();

Review Comment:
   deleteOnExit helps if for e.g the jvm OOMs and crashes before this line. 



##########
hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java:
##########
@@ -198,6 +209,41 @@ public Pair<ByteArrayOutputStream, Object> 
serializeRecordsToLogBlock(HoodieStor
                                                                         
HoodieSchema readerSchema,
                                                                         String 
keyFieldName,
                                                                         
Map<String, String> paramsMap) throws IOException {
-    throw new UnsupportedOperationException("serializeRecordsToLogBlock with 
iterator is not yet supported for Lance format");
+    return Pair.of(getByteArrayOutputStream(storage, records, writerSchema, 
readerSchema, keyFieldName, paramsMap, recordType), null);
+  }
+
+  private static HoodieConfig getHoodieConfig(Map<String, String> paramsMap) {
+    HoodieConfig config = new HoodieConfig();
+    paramsMap.forEach(config::setValue);
+    return config;
+  }
+
+  private static ByteArrayOutputStream getByteArrayOutputStream(HoodieStorage 
storage, Iterator<HoodieRecord> records, HoodieSchema writerSchema, 
HoodieSchema readerSchema, String keyFieldName,
+                                                                Map<String, 
String> paramsMap, HoodieRecord.HoodieRecordType recordType) throws IOException 
{
+    HoodieConfig config = getHoodieConfig(paramsMap);
+
+    File tempFile = File.createTempFile("lance-log-block-" + 
UUID.randomUUID(), ".lance");
+    StoragePath tempFilePath = new StoragePath(tempFile.toURI());
+
+    Object fileFormatMetadata;
+    try (HoodieFileWriter lanceWriter = 
HoodieFileWriterFactory.getFileWriter(null, tempFilePath, storage, config, 
writerSchema, new LocalTaskContextSupplier(), recordType)) {
+      while (records.hasNext()) {
+        HoodieRecord record = records.next();
+        String recordKey = record.getRecordKey(readerSchema, keyFieldName);
+        lanceWriter.write(recordKey, record, writerSchema);
+      }
+    }
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    byte[] buffer = new byte[COPY_BUFFER_SIZE];
+    try (FileInputStream fis = new FileInputStream(tempFile)) {

Review Comment:
   anything already in IOUtils to do this without writing code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to