codope commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1907401196


##########
hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java:
##########
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.HoodieFileSliceReader;
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createSecondaryIndexRecord;
+import static org.apache.hudi.metadata.HoodieTableMetadataUtil.filePath;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.tryResolveSchemaForTable;
+
+/**
+ * Utility methods for generating secondary index records during 
initialization and updates.
+ */
+public class SecondaryIndexRecordGenerationUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SecondaryIndexRecordGenerationUtils.class);
+
+  /**
+   * Converts the write stats to secondary index records.
+   *
+   * @param allWriteStats   list of write stats
+   * @param instantTime     instant time
+   * @param indexDefinition secondary index definition
+   * @param metadataConfig  metadata config
+   * @param fsView          file system view as of instant time
+   * @param dataMetaClient  data table meta client
+   * @param engineContext   engine context
+   * @param engineType      engine type (e.g. SPARK, FLINK or JAVA)
+   * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the 
metadata table for the given secondary index partition
+   */
+  public static HoodieData<HoodieRecord> 
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
+                                                                               
   String instantTime,
+                                                                               
   HoodieIndexDefinition indexDefinition,
+                                                                               
   HoodieMetadataConfig metadataConfig,
+                                                                               
   HoodieMetadataFileSystemView fsView,
+                                                                               
   HoodieTableMetaClient dataMetaClient,
+                                                                               
   HoodieEngineContext engineContext,
+                                                                               
   EngineType engineType) {
+    // Secondary index cannot support logs having inserts with current 
offering. So, lets validate that.
+    if (allWriteStats.stream().anyMatch(writeStat -> {
+      String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+      return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+    })) {
+      throw new HoodieIOException("Secondary index cannot support logs having 
inserts with current offering. Please disable secondary index.");
+    }
+
+    Schema tableSchema;
+    try {
+      tableSchema = tryResolveSchemaForTable(dataMetaClient).get();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest schema for " + 
dataMetaClient.getBasePath(), e);
+    }
+    Map<String, List<HoodieWriteStat>> writeStatsByFileId = 
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+    int parallelism = Math.max(Math.min(writeStatsByFileId.size(), 
metadataConfig.getSecondaryIndexParallelism()), 1);
+
+    return engineContext.parallelize(new 
ArrayList<>(writeStatsByFileId.entrySet()), 
parallelism).flatMap(writeStatsByFileIdEntry -> {
+      String fileId = writeStatsByFileIdEntry.getKey();
+      List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
+      String partition = writeStats.get(0).getPartitionPath();
+      FileSlice previousFileSliceForFileId = 
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+      Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
+      if (previousFileSliceForFileId == null) {
+        // new file slice, so empty mapping for previous slice
+        recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
+      } else {
+        StoragePath previousBaseFile = 
previousFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+        List<String> logFiles =
+            
previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+        recordKeyToSecondaryKeyForPreviousFileSlice =
+            getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles, 
tableSchema, partition, Option.ofNullable(previousBaseFile), indexDefinition, 
instantTime);
+      }
+      List<FileSlice> latestIncludingInflightFileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), 
partition);
+      FileSlice currentFileSliceForFileId = 
latestIncludingInflightFileSlices.stream().filter(fs -> 
fs.getFileId().equals(fileId)).findFirst()
+          .orElseThrow(() -> new HoodieException("Could not find any file 
slice for fileId " + fileId));
+      StoragePath currentBaseFile = 
currentFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+      List<String> logFilesIncludingInflight =
+          
currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+      Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
+          getRecordKeyToSecondaryKey(dataMetaClient, engineType, 
logFilesIncludingInflight, tableSchema, partition, 
Option.ofNullable(currentBaseFile), indexDefinition, instantTime);
+      // Need to find what secondary index record should be deleted, and what 
should be inserted.
+      // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is not present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should 
be inserted.
+      // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it 
is present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should be 
updated.
+      // For each entry in recordKeyToSecondaryKeyForPreviousFileSlice, if it 
is not present in recordKeyToSecondaryKeyForCurrentFileSlice, then it should be 
deleted.
+      List<HoodieRecord> records = new ArrayList<>();
+      recordKeyToSecondaryKeyForCurrentFileSlice.forEach((recordKey, 
secondaryKey) -> {
+        if 
(!recordKeyToSecondaryKeyForPreviousFileSlice.containsKey(recordKey)) {
+          records.add(createSecondaryIndexRecord(recordKey, secondaryKey, 
indexDefinition.getIndexName(), false));
+        } else {
+          // delete previous entry and insert new value if secondaryKey is 
different
+          if 
(!recordKeyToSecondaryKeyForPreviousFileSlice.get(recordKey).equals(secondaryKey))
 {
+            records.add(createSecondaryIndexRecord(recordKey, 
recordKeyToSecondaryKeyForPreviousFileSlice.get(recordKey), 
indexDefinition.getIndexName(), true));
+            records.add(createSecondaryIndexRecord(recordKey, secondaryKey, 
indexDefinition.getIndexName(), false));
+          }
+        }
+      });
+      recordKeyToSecondaryKeyForPreviousFileSlice.forEach((recordKey, 
secondaryKey) -> {
+        if 
(!recordKeyToSecondaryKeyForCurrentFileSlice.containsKey(recordKey)) {
+          records.add(createSecondaryIndexRecord(recordKey, secondaryKey, 
indexDefinition.getIndexName(), true));
+        }
+      });
+      return records.iterator();
+    });
+  }
+
+  private static Map<String, String> 
getRecordKeyToSecondaryKey(HoodieTableMetaClient metaClient,
+                                                                EngineType 
engineType, List<String> logFilePaths,
+                                                                Schema 
tableSchema, String partition,
+                                                                
Option<StoragePath> dataFilePath,
+                                                                
HoodieIndexDefinition indexDefinition,
+                                                                String 
instantTime) throws Exception {
+    final String basePath = metaClient.getBasePath().toString();
+    final StorageConfiguration<?> storageConf = metaClient.getStorageConf();
+
+    HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+        basePath,
+        engineType,
+        Collections.emptyList(),
+        metaClient.getTableConfig().getRecordMergeStrategyId());
+
+    HoodieMergedLogRecordScanner mergedLogRecordScanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withStorage(metaClient.getStorage())
+        .withBasePath(metaClient.getBasePath())
+        .withLogFilePaths(logFilePaths)
+        .withReaderSchema(tableSchema)
+        .withLatestInstantTime(instantTime)
+        .withReverseReader(false)
+        
.withMaxMemorySizeInBytes(storageConf.getLong(MAX_MEMORY_FOR_COMPACTION.key(), 
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
+        
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue())
+        .withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
+        .withPartition(partition)
+        .withOptimizedLogBlocksScan(storageConf.getBoolean("hoodie" + 
HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))
+        .withDiskMapType(storageConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
+        
.withBitCaskDiskMapCompressionEnabled(storageConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+        .withRecordMerger(recordMerger)
+        .withTableMetaClient(metaClient)
+        .build();
+
+    Option<HoodieFileReader> baseFileReader = Option.empty();
+    if (dataFilePath.isPresent()) {
+      baseFileReader = 
Option.of(HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType()).getFileReader(getReaderConfigs(storageConf),
 dataFilePath.get()));
+    }
+    HoodieFileSliceReader fileSliceReader = new 
HoodieFileSliceReader(baseFileReader, mergedLogRecordScanner, tableSchema, 
metaClient.getTableConfig().getPreCombineField(), recordMerger,
+        metaClient.getTableConfig().getProps(), Option.empty(), 
Option.empty());
+    // Collect the records from the iterator in a map by record key to 
secondary key
+    Map<String, String> recordKeyToSecondaryKey = new HashMap<>();
+    while (fileSliceReader.hasNext()) {
+      HoodieRecord record = (HoodieRecord) fileSliceReader.next();
+      String secondaryKey = getSecondaryKey(record, tableSchema, 
indexDefinition);
+      if (secondaryKey != null) {
+        // no delete records here
+        recordKeyToSecondaryKey.put(record.getRecordKey(tableSchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD), secondaryKey);
+      }
+    }
+    return recordKeyToSecondaryKey;
+  }
+
+  private static String getSecondaryKey(HoodieRecord record, Schema 
tableSchema, HoodieIndexDefinition indexDefinition) {
+    try {
+      if (record.toIndexedRecord(tableSchema, 
CollectionUtils.emptyProps()).isPresent()) {
+        GenericRecord genericRecord = (GenericRecord) 
(record.toIndexedRecord(tableSchema, 
CollectionUtils.emptyProps()).get()).getData();
+        String secondaryKeyFields = String.join(".", 
indexDefinition.getSourceFields());
+        return HoodieAvroUtils.getNestedFieldValAsString(genericRecord, 
secondaryKeyFields, true, false);
+      }
+    } catch (IOException e) {
+      LOG.debug("Failed to fetch secondary key for record key " + 
record.getKey().toString());
+    }
+    return null;
+  }
+
+  public static HoodieData<HoodieRecord> 
readSecondaryKeysFromFileSlices(HoodieEngineContext engineContext,
+                                                                         
List<Pair<String, FileSlice>> partitionFileSlicePairs,
+                                                                         int 
secondaryIndexMaxParallelism,
+                                                                         
String activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
+                                                                         
HoodieIndexDefinition indexDefinition) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+    final int parallelism = Math.min(partitionFileSlicePairs.size(), 
secondaryIndexMaxParallelism);
+    final StoragePath basePath = metaClient.getBasePath();
+    Schema tableSchema;
+    try {
+      tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest schema for " + 
metaClient.getBasePath(), e);
+    }
+
+    engineContext.setJobStatus(activeModule, "Secondary Index: reading 
secondary keys from " + partitionFileSlicePairs.size() + " file slices");
+    return engineContext.parallelize(partitionFileSlicePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+      final String partition = partitionAndBaseFile.getKey();
+      final FileSlice fileSlice = partitionAndBaseFile.getValue();
+      List<String> logFilePaths = 
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(l -> 
l.getPath().toString()).collect(Collectors.toList());
+      Option<StoragePath> dataFilePath = 
Option.ofNullable(fileSlice.getBaseFile().map(baseFile -> filePath(basePath, 
partition, baseFile.getFileName())).orElseGet(null));
+      Schema readerSchema;
+      if (dataFilePath.isPresent()) {
+        readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
+            
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
+            .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+      } else {
+        readerSchema = tableSchema;
+      }
+      return createSecondaryIndexGenerator(metaClient, engineType, 
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,

Review Comment:
   > what happens when ingestion commit (DC), adds a new base file for an 
existing file slice?
   
   I guess you mean filegroup and not file slice, because a file slice is 
associated with one base file right. So, in that case previous file slice will 
contain the base file and the log files only as of the last committed time. 
While the current file slice will contain new base file.
   
   > do we have tests for this scenario.
   
   Doesn't COW basically cover this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to