nsivabalan commented on code in PR #13361:
URL: https://github.com/apache/hudi/pull/13361#discussion_r2114774513


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -20,36 +20,45 @@
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.log.HoodieFileSliceReader;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
 import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS;
 
 /**
  * Pluggable implementation for writing data into new file groups based on 
ClusteringPlan.
  */
 public abstract class ClusteringExecutionStrategy<T, I, K, O> implements 
Serializable {

Review Comment:
   fair enough. not really a blocking comment. we can take it as a follow up



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java:
##########
@@ -1146,7 +1149,7 @@ public void 
testInflightClusteringRollbackWhenUpdatesAllowed(boolean rollbackPen
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(EAGER);
     addConfigsForPopulateMetaFields(cfgBuilder, true);
     cfgBuilder.withClusteringConfig(clusteringConfig);
-    cfgBuilder.withProperties(getPropertiesForKeyGen());
+    cfgBuilder.withProperties(getPropertiesForKeyGen(true));

Review Comment:
   shouldn't this be 
   ```
   cfgBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields));
   ```



##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java:
##########
@@ -275,25 +198,39 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEng
       } else {
         readerSchema = tableSchema;
       }
-      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
-          
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""));
+      ClosableIterator<Pair<String, String>> secondaryIndexGenerator = 
createSecondaryIndexGenerator(readerContextFactory.getContext(), metaClient, 
fileSlice, readerSchema, indexDefinition,
+          
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""),
 props, false);
+      return new CloseableMappingIterator<>(secondaryIndexGenerator, pair -> 
createSecondaryIndexRecord(pair.getKey(), pair.getValue(), 
indexDefinition.getIndexName(), false));
     });
   }
 
-  private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
-                                                                              
EngineType engineType, List<String> logFilePaths,
-                                                                              
Schema tableSchema, String partition,
-                                                                              
Option<StoragePath> dataFilePath,
-                                                                              
HoodieIndexDefinition indexDefinition,
-                                                                              
String instantTime) throws Exception {
-    return new ClosableIterator<HoodieRecord>() {
-      private final HoodieFileSliceReader<HoodieRecord> fileSliceReader = 
getFileSliceReader(
-          metaClient, engineType, logFilePaths, tableSchema, partition, 
dataFilePath, instantTime);
-      private HoodieRecord nextValidRecord;
+  private static <T> ClosableIterator<Pair<String, String>> 
createSecondaryIndexGenerator(HoodieReaderContext<T> readerContext,

Review Comment:
   I know this method was not added in this patch. 
   but can we rename this to `createSecondaryIndexRecordGenerator`
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java:
##########
@@ -84,51 +93,50 @@ protected HoodieWriteConfig getWriteConfig() {
     return this.writeConfig;
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithLogFiles(ClusteringOperation operation, String 
instantTime, long maxMemory,
-                                                                            
Option<BaseKeyGenerator> keyGeneratorOpt, Option<HoodieFileReader> 
baseFileReaderOpt) {
+  protected ClosableIterator<HoodieRecord<T>> 
getRecordIterator(ReaderContextFactory<T> readerContextFactory, 
ClusteringOperation operation, String instantTime, long maxMemory) {
     HoodieWriteConfig config = getWriteConfig();
+    TypedProperties props = TypedProperties.copy(config.getProps());
+    props.setProperty(MAX_MEMORY_FOR_MERGE.key(), Long.toString(maxMemory));
+
     HoodieTable table = getHoodieTable();
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(table.getStorage())
-        .withBasePath(table.getMetaClient().getBasePath())
-        .withLogFilePaths(operation.getDeltaFilePaths())
-        .withReaderSchema(readerSchemaWithMetaFields)
-        .withLatestInstantTime(instantTime)
-        .withMaxMemorySizeInBytes(maxMemory)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withPartition(operation.getPartitionPath())
-        .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(table.getMetaClient())
-        .build();
 
+    FileSlice fileSlice = 
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(), 
operation);
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    Option<InternalSchema> internalSchema = 
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
     try {
-      return new HoodieFileSliceReader(baseFileReaderOpt, scanner, 
readerSchemaWithMetaFields, tableConfig.getPreCombineField(), 
config.getRecordMerger(),
-          tableConfig.getProps(),
-          tableConfig.populateMetaFields() ? Option.empty() : 
Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-              tableConfig.getPartitionFieldProp())), keyGeneratorOpt);
+      return getFileGroupReader(table.getMetaClient(), fileSlice, 
readerSchemaWithMetaFields, internalSchema, readerContextFactory, instantTime, 
usePosition).getClosableHoodieRecordIterator();
     } catch (IOException e) {
       throw new HoodieClusteringException("Error reading file slices", e);
     }
   }
 
-  protected ClosableIterator<HoodieRecord<T>> 
getRecordIteratorWithBaseFileOnly(Option<BaseKeyGenerator> keyGeneratorOpt, 
HoodieFileReader baseFileReader) {
-    // NOTE: Record have to be cloned here to make sure if it holds low-level 
engine-specific
-    //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
-    //       it since these records will be shuffled later.
-    ClosableIterator<HoodieRecord> baseRecordsIterator;
-    try {
-      baseRecordsIterator = 
baseFileReader.getRecordIterator(readerSchemaWithMetaFields);
-    } catch (IOException e) {
-      throw new HoodieClusteringException("Error reading base file", e);
-    }
-    return new CloseableMappingIterator(
-        baseRecordsIterator,
-        rec -> ((HoodieRecord) 
rec).copy().wrapIntoHoodieRecordPayloadWithKeyGen(readerSchemaWithMetaFields, 
writeConfig.getProps(), keyGeneratorOpt));
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  protected FileSlice clusteringOperationToFileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();

Review Comment:
   gotcha



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -471,12 +413,7 @@ public Iterator<InternalRow> call(ClusteringOperation 
clusteringOperation) throw
         }
 
         // instantiate FG reader
-        HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
-        HoodieReaderContext<InternalRow> readerContext = 
readerContextFactory.getContext();
-        HoodieFileGroupReader<InternalRow> fileGroupReader = 
HoodieFileGroupReader.<InternalRow>newBuilder()
-            
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
-            
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-            
.withShouldUseRecordPosition(usePosition).withProps(metaClient.getTableConfig().getProps()).build();
+        HoodieFileGroupReader<InternalRow> fileGroupReader = 
getFileGroupReader(metaClient, fileSlice, readerSchema, internalSchemaOption, 
readerContextFactory, instantTime, usePosition);

Review Comment:
   cool



##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java:
##########
@@ -275,25 +198,39 @@ public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEng
       } else {
         readerSchema = tableSchema;
       }
-      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
-          
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""));
+      ClosableIterator<Pair<String, String>> secondaryIndexGenerator = 
createSecondaryIndexGenerator(readerContextFactory.getContext(), metaClient, 
fileSlice, readerSchema, indexDefinition,
+          
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::requestedTime).orElse(""),
 props, false);
+      return new CloseableMappingIterator<>(secondaryIndexGenerator, pair -> 
createSecondaryIndexRecord(pair.getKey(), pair.getValue(), 
indexDefinition.getIndexName(), false));
     });
   }
 
-  private static ClosableIterator<HoodieRecord> 
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
-                                                                              
EngineType engineType, List<String> logFilePaths,
-                                                                              
Schema tableSchema, String partition,
-                                                                              
Option<StoragePath> dataFilePath,
-                                                                              
HoodieIndexDefinition indexDefinition,
-                                                                              
String instantTime) throws Exception {
-    return new ClosableIterator<HoodieRecord>() {
-      private final HoodieFileSliceReader<HoodieRecord> fileSliceReader = 
getFileSliceReader(
-          metaClient, engineType, logFilePaths, tableSchema, partition, 
dataFilePath, instantTime);
-      private HoodieRecord nextValidRecord;
+  private static <T> ClosableIterator<Pair<String, String>> 
createSecondaryIndexGenerator(HoodieReaderContext<T> readerContext,

Review Comment:
   java docs. what do we return from here. 
   each constituent of the Pair specifically 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to