nsivabalan commented on code in PR #13361:
URL: https://github.com/apache/hudi/pull/13361#discussion_r2114260663


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -471,12 +413,7 @@ public Iterator<InternalRow> call(ClusteringOperation 
clusteringOperation) throw
         }
 
         // instantiate FG reader
-        HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
-        HoodieReaderContext<InternalRow> readerContext = 
readerContextFactory.getContext();
-        HoodieFileGroupReader<InternalRow> fileGroupReader = 
HoodieFileGroupReader.<InternalRow>newBuilder()
-            
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
-            
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-            
.withShouldUseRecordPosition(usePosition).withProps(metaClient.getTableConfig().getProps()).build();
+        HoodieFileGroupReader<InternalRow> fileGroupReader = 
getFileGroupReader(metaClient, fileSlice, readerSchema, internalSchemaOption, 
readerContextFactory, instantTime, usePosition);

Review Comment:
   may be I am not aware of this. 
   does this seamlessly take care of bootstrap base path as well? 
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
     return this.writeConfig;
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithLogFiles(ClusteringOperation operation, String 
instantTime, long maxMemory,
-                                                                            
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> 
baseFileReaderOpt) {
+  protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
+    TypedProperties props = TypedProperties.copy(config.getProps());
+    props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
     HoodieTable table = getHoodieTable();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(table.getStorage())
-        .withBasePath(table.getMetaClient().getBasePath())
-        .withLogFilePaths(operation.getDeltaFilePaths())
-        .withReaderSchema(readerSchemaWithMetaFields)
-        .withLatestInstantTime(instantTime)
-        .withMaxMemorySizeInBytes(maxMemory)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withPartition(operation.getPartitionPath())
-        .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(table.getMetaClient())
-        .build();
 
+    FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
-      return new HoodieFileSliceReader(baseFileReaderOpt, scanner, 
readerSchemaWithMetaFields, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-          tableConfig.getProps(),
-          tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+      return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieFileReader baseFileReader) {
-    // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
-    //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-    //       it since these records will be shuffled later.
-    ClosableIterator<HoodieRecord> baseRecordsIterator;
-    try {
-      baseRecordsIterator = 
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
-    } catch (IOException e) {
-      throw new HoodieClusteringException("Error reading base file", e);
-    }
-    return new CloseableMappingIterator(
-        baseRecordsIterator,
-        rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, 
writeConfig.getProps(), keyGeneratorOpt));
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  protected FileSlice clusteringOperationToFileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();

Review Comment:
   I feel constructing the FileSlice based on a given clusteringOperation might 
be repeated thing. given HoodieFileGroupReader takes in a FileSlice as an 
argument. 
   
   can we move this to some common utility method. depending on bandwidth, we 
can punt fixing all other callers (outside of this patch) for now. but atleast 
file a tracking ticket and refer to the common utility method that we might add 
it as part of this patch. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
     return this.writeConfig;
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithLogFiles(ClusteringOperation operation, String 
instantTime, long maxMemory,
-                                                                            
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> 
baseFileReaderOpt) {
+  protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
+    TypedProperties props = TypedProperties.copy(config.getProps());
+    props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
     HoodieTable table = getHoodieTable();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(table.getStorage())
-        .withBasePath(table.getMetaClient().getBasePath())
-        .withLogFilePaths(operation.getDeltaFilePaths())
-        .withReaderSchema(readerSchemaWithMetaFields)
-        .withLatestInstantTime(instantTime)
-        .withMaxMemorySizeInBytes(maxMemory)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withPartition(operation.getPartitionPath())
-        .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(table.getMetaClient())
-        .build();
 
+    FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
-      return new HoodieFileSliceReader(baseFileReaderOpt, scanner, 
readerSchemaWithMetaFields, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-          tableConfig.getProps(),
-          tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+      return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieFileReader baseFileReader) {
-    // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
-    //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-    //       it since these records will be shuffled later.
-    ClosableIterator<HoodieRecord> baseRecordsIterator;
-    try {
-      baseRecordsIterator = 
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
-    } catch (IOException e) {
-      throw new HoodieClusteringException("Error reading base file", e);
-    }
-    return new CloseableMappingIterator(
-        baseRecordsIterator,
-        rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, 
writeConfig.getProps(), keyGeneratorOpt));
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  protected FileSlice clusteringOperationToFileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();
+    boolean baseFileExists = 
!StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath());
+    HoodieBaseFile baseFile = baseFileExists ? new HoodieBaseFile(new 
StoragePath(basePath, clusteringOperation.getDataFilePath()).toString()) : null;
+    List<HoodieLogFile> logFiles = 
clusteringOperation.getDeltaFilePaths().stream().map(p ->
+            new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+                basePath, partitionPath), p)))
+        .sorted(new HoodieLogFile.LogFileComparator())
+        .collect(Collectors.toList());
+
+    ValidationUtils.checkState(baseFileExists || !logFiles.isEmpty(), "Both 
base file and log files are missing from this clustering operation " + 
clusteringOperation);
+    String baseInstantTime = baseFileExists ? baseFile.getCommitTime() : 
logFiles.get(0).getDeltaCommitTime();
+    FileSlice fileSlice = new FileSlice(partitionPath, baseInstantTime, 
clusteringOperation.getFileId());
+    fileSlice.setBaseFile(baseFile);

Review Comment:
   baseFile could be null as well. 
   can we maintain Option earlier and do option.ifPresent here 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
     return this.writeConfig;
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithLogFiles(ClusteringOperation operation, String 
instantTime, long maxMemory,
-                                                                            
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> 
baseFileReaderOpt) {
+  protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
+    TypedProperties props = TypedProperties.copy(config.getProps());
+    props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
     HoodieTable table = getHoodieTable();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(table.getStorage())
-        .withBasePath(table.getMetaClient().getBasePath())
-        .withLogFilePaths(operation.getDeltaFilePaths())
-        .withReaderSchema(readerSchemaWithMetaFields)
-        .withLatestInstantTime(instantTime)
-        .withMaxMemorySizeInBytes(maxMemory)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withPartition(operation.getPartitionPath())
-        .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(table.getMetaClient())
-        .build();
 
+    FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
-      return new HoodieFileSliceReader(baseFileReaderOpt, scanner, 
readerSchemaWithMetaFields, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-          tableConfig.getProps(),
-          tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+      return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieFileReader baseFileReader) {
-    // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
-    //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-    //       it since these records will be shuffled later.
-    ClosableIterator<HoodieRecord> baseRecordsIterator;
-    try {
-      baseRecordsIterator = 
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
-    } catch (IOException e) {
-      throw new HoodieClusteringException("Error reading base file", e);
-    }
-    return new CloseableMappingIterator(
-        baseRecordsIterator,
-        rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, 
writeConfig.getProps(), keyGeneratorOpt));
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  protected FileSlice clusteringOperationToFileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();
+    boolean baseFileExists = 
!StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath());
+    HoodieBaseFile baseFile = baseFileExists ? new HoodieBaseFile(new 
StoragePath(basePath, clusteringOperation.getDataFilePath()).toString()) : null;
+    List<HoodieLogFile> logFiles = 
clusteringOperation.getDeltaFilePaths().stream().map(p ->
+            new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+                basePath, partitionPath), p)))
+        .sorted(new HoodieLogFile.LogFileComparator())

Review Comment:
   
https://github.com/apache/hudi/blob/94e4d3763e7919c674762625ea8f331629691ad7/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java#L125
 
   we already enforce ordering within FG reader. 
   can we avoid the duplicate ordering 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
     return this.writeConfig;
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithLogFiles(ClusteringOperation operation, String 
instantTime, long maxMemory,
-                                                                            
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> 
baseFileReaderOpt) {
+  protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
+    TypedProperties props = TypedProperties.copy(config.getProps());
+    props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
     HoodieTable table = getHoodieTable();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(table.getStorage())
-        .withBasePath(table.getMetaClient().getBasePath())
-        .withLogFilePaths(operation.getDeltaFilePaths())
-        .withReaderSchema(readerSchemaWithMetaFields)
-        .withLatestInstantTime(instantTime)
-        .withMaxMemorySizeInBytes(maxMemory)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withPartition(operation.getPartitionPath())
-        .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(table.getMetaClient())
-        .build();
 
+    FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
-      return new HoodieFileSliceReader(baseFileReaderOpt, scanner, 
readerSchemaWithMetaFields, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-          tableConfig.getProps(),
-          tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+      return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieFileReader baseFileReader) {
-    // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
-    //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-    //       it since these records will be shuffled later.
-    ClosableIterator<HoodieRecord> baseRecordsIterator;
-    try {
-      baseRecordsIterator = 
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
-    } catch (IOException e) {
-      throw new HoodieClusteringException("Error reading base file", e);
-    }
-    return new CloseableMappingIterator(
-        baseRecordsIterator,
-        rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, 
writeConfig.getProps(), keyGeneratorOpt));
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  protected FileSlice clusteringOperationToFileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();

Review Comment:
   or if we taking care of all such callers in this patch, I am good. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -20,36 +20,45 @@
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.log.HoodieFileSliceReader;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS;
 
 /**
  * Pluggable implementation for writing data into new file groups based on 
ClusteringPlan.
  */
 public abstract class ClusteringExecutionStrategy<T, I, K, O> implements 
Serializable {

Review Comment:
   should we rename this class to `BaseClusteringExecutionStrategy`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to