jonvex commented on code in PR #12343:
URL: https://github.com/apache/hudi/pull/12343#discussion_r1861180723


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -278,4 +311,117 @@ public CompactionExecutionHelper 
getCompactionExecutionStrategy(HoodieCompaction
     }
   }
 
+  List<WriteStatus> compactWithPartialUpdate(HoodieReaderContext readerContext,
+                                             String instantTime,
+                                             HoodieTableMetaClient metaClient,
+                                             CompactionOperation operation,
+                                             List<String> logFilePaths,
+                                             Schema readerSchema,
+                                             Option<InternalSchema> 
internalSchemaOpt,
+                                             TypedProperties props,
+                                             HoodieWriteConfig config,
+                                             TaskContextSupplier 
taskContextSupplier) throws IOException {
+    HoodieTimer timer = HoodieTimer.start();
+    timer.startTimer();
+    List<HoodieLogFile> logFiles = logFilePaths.stream()
+        .map(p -> new HoodieLogFile(new StoragePath(p))).collect(toList());
+    Option<HoodieBaseFile> baseFileOpt =
+        operation.getBaseFile(metaClient.getBasePath().toString(), 
operation.getPartitionPath());
+    FileSlice fileSlice = new FileSlice(
+        operation.getFileGroupId(),
+        operation.getBaseInstantTime(),
+        baseFileOpt.isPresent() ? baseFileOpt.get() : null,
+        logFiles);
+    // 1. Generate the input for fg reader.
+    HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+        readerContext,
+        metaClient.getStorage(),
+        metaClient.getBasePath().toString(),
+        operation.getBaseInstantTime(),
+        fileSlice,
+        readerSchema,
+        readerSchema,
+        internalSchemaOpt,
+        metaClient,
+        props,
+        0,
+        -1,
+        true);
+    // 2. Get the `HoodieFileGroupReaderIterator` from the fg reader.
+    fileGroupReader.initRecordIterators();
+    HoodieFileGroupReader.HoodieFileGroupReaderIterator<T> recordIterator
+        = fileGroupReader.getClosableIterator();
+    // 3. Write the record using parquet writer.
+    String writeToken = FSUtils.makeWriteToken(
+        taskContextSupplier.getPartitionIdSupplier().get(),
+        taskContextSupplier.getStageIdSupplier().get(),
+        taskContextSupplier.getAttemptIdSupplier().get());
+    String newFileName =
+        FSUtils.makeBaseFileName(instantTime, writeToken, 
operation.getFileId(), ".parquet");
+    // compaction has to use this schema.
+    Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(readerSchema, 
config.allowOperationMetadataField());
+    HoodieFileWriter fileWriter = HoodieFileWriterFactory.getFileWriter(
+        operation.getDataFileCommitTime().get(),
+        new StoragePath(newFileName),
+        metaClient.getStorage(),
+        config,
+        writeSchemaWithMetaFields, taskContextSupplier, 
config.getRecordMerger().getRecordType());
+
+    WriteStatus writestatus = initNewStatus(config, true, operation);
+    long recordsWritten = 0;
+    long errorRecords = 0;
+    while (recordIterator.hasNext()) {
+      HoodieRecord record = (HoodieRecord) recordIterator.next();
+      try {
+        fileWriter.write(record.getRecordKey(), record, 
writeSchemaWithMetaFields);
+        recordsWritten++;
+        writestatus.markSuccess(record, record.getMetadata());
+      } catch (Throwable t) {
+        errorRecords++;
+        writestatus.markFailure(record, t, record.getMetadata());
+        LOG.error("Error write record: {}", record, t);
+      }
+    }
+
+    // 4. Construct the return values.
+    writestatus.setTotalRecords(recordsWritten);
+    writestatus.setTotalErrorRecords(errorRecords);
+    setupWriteStatus(new StoragePath(newFileName), writestatus, config, 
metaClient.getStorage(), timer);
+    List<WriteStatus> writeStatuses = 
SingletonList.newSingletonList(writestatus);

Review Comment:
   I think it's supposed to be Collections.SingletonList



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to