This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new be7120d6d91 [HUDI-7480] Fix functional index and avoid multiple 
initializations (#10860)
be7120d6d91 is described below

commit be7120d6d91c1371cb26924c94eb7b3647ff520e
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Apr 9 16:24:29 2024 +0530

    [HUDI-7480] Fix functional index and avoid multiple initializations (#10860)
    
    This PR fixes two issues:
    - When a functional index is updated, it was getting re-initialized. This 
was happening because metadata partition type only contains the functional 
index partition prefix while hoodie.properties contains the full name. Due to 
discrepancy, metadata writer thought there is anew index that needs ot be 
initialized. This is fixed in `HoodieTableConfig#isMetadataPartitionAvailable.`
    - When a functional index is updated, it fetches latest file slices, 
including the pending ones, in data partition. There was a typo in building the 
filesystem view. Instead of passing data meta client, metadata meta client was 
passed.
    Added a test which covers both these scenarios.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 24 +++++---
 .../hudi/common/table/HoodieTableConfig.java       |  8 ++-
 .../hudi/common/table/HoodieTableMetaClient.java   |  2 +-
 .../table/view/AbstractTableFileSystemView.java    | 15 ++++-
 .../table/view/HoodieTableFileSystemView.java      | 14 -----
 .../table/view/PriorityBasedFileSystemView.java    |  5 ++
 .../view/RemoteHoodieTableFileSystemView.java      | 13 +++++
 .../common/table/view/TableFileSystemView.java     |  8 +++
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  5 +-
 .../table/view/TestHoodieTableFileSystemView.java  | 49 ++++++++++++++++
 .../view/TestPriorityBasedFileSystemView.java      | 26 +++++++++
 .../hudi/command/index/TestFunctionalIndex.scala   | 68 ++++++++++++++++++++++
 .../hudi/timeline/service/RequestHandler.java      |  8 +++
 .../service/handlers/FileSliceHandler.java         |  5 ++
 14 files changed, 222 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 99739947077..ca9c72ac601 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -97,7 +97,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
@@ -434,7 +433,12 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
             fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
             break;
           case FUNCTIONAL_INDEX:
-            fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition();
+            Set<String> functionalIndexPartitionsToInit = 
getFunctionalIndexPartitionsToInit();
+            if (functionalIndexPartitionsToInit.isEmpty()) {
+              continue;
+            }
+            ValidationUtils.checkState(functionalIndexPartitionsToInit.size() 
== 1, "Only one functional index at a time is supported for now");
+            fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
             break;
           default:
             throw new HoodieMetadataException("Unsupported MDT partition type: 
" + partitionType);
@@ -520,9 +524,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
                                                                         int 
parallelism, Schema readerSchema,
                                                                         
SerializableConfiguration hadoopConf);
 
-  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFunctionalIndexPartition() throws Exception {
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeFunctionalIndexPartition(String indexName) throws Exception {
     HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata);
-    String indexName = 
dataWriteConfig.getFunctionalIndexConfig().getIndexName();
     HoodieFunctionalIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexName);
     // Collect the list of latest file slices present in each partition
     List<String> partitions = metadata.getAllPartitionPaths();
@@ -534,10 +537,17 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
     int parallelism = Math.min(partitionFileSlicePairs.size(), 
dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
-    Schema readerSchema = 
addMetadataFields(getProjectedSchemaForFunctionalIndex(indexDefinition, 
dataMetaClient), dataWriteConfig.allowOperationMetadataField());
+    Schema readerSchema = 
getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
     return Pair.of(fileGroupCount, 
getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition, 
dataMetaClient, parallelism, readerSchema, hadoopConf));
   }
 
+  private Set<String> getFunctionalIndexPartitionsToInit() {
+    Set<String> functionalIndexPartitions = 
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().keySet();
+    Set<String> completedMetadataPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
+    functionalIndexPartitions.removeAll(completedMetadataPartitions);
+    return functionalIndexPartitions;
+  }
+
   private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String 
indexName) {
     Option<HoodieFunctionalIndexMetadata> functionalIndexMetadata = 
dataMetaClient.getFunctionalIndexMetadata();
     if (functionalIndexMetadata.isPresent()) {
@@ -991,9 +1001,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     HoodieFunctionalIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);
     List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
-    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemView(dataMetaClient);
     commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value) 
-> {
-      List<FileSlice> fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), dataPartition);
+      List<FileSlice> fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, 
Option.ofNullable(fsView), dataPartition);
       fileSlices.forEach(fileSlice -> {
         // Filter log files for the instant time and add to this partition 
fileSlice pairs
         List<HoodieLogFile> logFilesForInstant = fileSlice.getLogFiles()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 3cac820d4a8..9b0b294d3e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -737,11 +737,13 @@ public class HoodieTableConfig extends HoodieConfig {
   /**
    * Checks if metadata table is enabled and the specified partition has been 
initialized.
    *
-   * @param partition The partition to check
+   * @param metadataPartitionType The metadata table partition type to check
    * @returns true if the specific partition has been initialized, else 
returns false.
    */
-  public boolean isMetadataPartitionAvailable(MetadataPartitionType partition) 
{
-    return getMetadataPartitions().contains(partition.getPartitionPath());
+  public boolean isMetadataPartitionAvailable(MetadataPartitionType 
metadataPartitionType) {
+    /*return getMetadataPartitions().stream().anyMatch(metadataPartition ->
+        metadataPartition.equals(metadataPartitionType.getPartitionPath()) || 
(FUNCTIONAL_INDEX.equals(metadataPartitionType) && 
metadataPartition.startsWith(FUNCTIONAL_INDEX.getPartitionPath())));*/
+    return 
getMetadataPartitions().contains(metadataPartitionType.getPartitionPath());
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 6468d165568..f9f23bbbd2d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -224,7 +224,7 @@ public class HoodieTableMetaClient implements Serializable {
    * Returns Option of {@link HoodieFunctionalIndexMetadata} from index 
definition file if present, else returns empty Option.
    */
   public Option<HoodieFunctionalIndexMetadata> getFunctionalIndexMetadata() {
-    if (functionalIndexMetadata.isPresent()) {
+    if (functionalIndexMetadata.isPresent() && 
!functionalIndexMetadata.get().getIndexDefinitions().isEmpty()) {
       return functionalIndexMetadata;
     }
     if (tableConfig.getIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getIndexDefinitionPath().get())) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index cdac0eeeb20..c1d60e1a33f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -451,7 +451,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    *
    * @param partition partition to be loaded if not present
    */
-  private void ensurePartitionLoadedCorrectly(String partition) {
+  protected void ensurePartitionLoadedCorrectly(String partition) {
 
     ValidationUtils.checkArgument(!isClosed(), "View is already closed");
 
@@ -851,6 +851,19 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     }
   }
 
+  public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String 
partitionPath) {
+    try {
+      readLock.lock();
+      ensurePartitionLoadedCorrectly(partitionPath);
+      return fetchAllStoredFileGroups(partitionPath)
+          .map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
+          .filter(Option::isPresent)
+          .map(Option::get);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   @Override
   public final Stream<FileSlice> getLatestFileSlicesStateless(String 
partitionStr) {
     String partition = formatPartitionKey(partitionStr);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 427258ff596..144e2bf8073 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.view;
 
 import org.apache.hudi.common.model.BootstrapBaseFileMapping;
 import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -421,19 +420,6 @@ public class HoodieTableFileSystemView extends 
IncrementalTimelineSyncFileSystem
     return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
   }
 
-  /**
-   * Get the latest file slices for a given partition including the inflight 
ones.
-   *
-   * @param partitionPath The partition path of interest
-   * @return Stream of latest {@link FileSlice} in the partition path.
-   */
-  public Stream<FileSlice> fetchLatestFileSlicesIncludingInflight(String 
partitionPath) {
-    return fetchAllStoredFileGroups(partitionPath)
-        .map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
-        .filter(Option::isPresent)
-        .map(Option::get);
-  }
-
   @Override
   public void close() {
     super.close();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 1e4b1852d1b..8cfd6d64713 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -203,6 +203,11 @@ public class PriorityBasedFileSystemView implements 
SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getLatestFileSlices, 
secondaryView::getLatestFileSlices);
   }
 
+  @Override
+  public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String 
partitionPath) {
+    return execute(partitionPath, 
preferredView::getLatestFileSlicesIncludingInflight, 
secondaryView::getLatestFileSlicesIncludingInflight);
+  }
+
   @Override
   public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
     return execute(partitionPath, preferredView::getLatestFileSlicesStateless, 
secondaryView::getLatestFileSlicesStateless);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 61c90c6eb02..94f73e70c79 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
 
   private static final String BASE_URL = "/v1/hoodie/view";
   public static final String LATEST_PARTITION_SLICES_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
+  public static final String LATEST_PARTITION_SLICES_INFLIGHT_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/inflight/");
   public static final String LATEST_PARTITION_SLICES_STATELESS_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
   public static final String LATEST_PARTITION_SLICE_URL = 
String.format("%s/%s", BASE_URL, "slices/file/latest/");
   public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
@@ -338,6 +339,18 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String 
partitionPath) {
+    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    try {
+      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICES_INFLIGHT_URL, paramsMap,
+          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
+      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 87b3db142e6..76a928a0b5b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -107,6 +107,14 @@ public interface TableFileSystemView {
      */
     Stream<FileSlice> getLatestFileSlices(String partitionPath);
 
+    /**
+     * Get the latest file slices for a given partition including the inflight 
ones.
+     *
+     * @param partitionPath The partition path of interest
+     * @return Stream of latest {@link FileSlice} in the partition path.
+     */
+    Stream<FileSlice> getLatestFileSlicesIncludingInflight(String 
partitionPath);
+
     /**
      * Stream all the latest file slices in the given partition
      * without caching the file group mappings.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e81cd63f66e..b67faddc5fb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -125,6 +125,7 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
@@ -1081,7 +1082,7 @@ public class HoodieTableMetadataUtil {
                                                                               
Option<HoodieTableFileSystemView> fileSystemView,
                                                                               
String partition) {
     HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> 
getFileSystemView(metaClient));
-    Stream<FileSlice> fileSliceStream = 
fsView.fetchLatestFileSlicesIncludingInflight(partition);
+    Stream<FileSlice> fileSliceStream = 
fsView.getLatestFileSlicesIncludingInflight(partition);
     return fileSliceStream
         .sorted(Comparator.comparing(FileSlice::getFileId))
         .collect(Collectors.toList());
@@ -1919,7 +1920,7 @@ public class HoodieTableMetadataUtil {
   public static Schema 
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition 
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
     Schema tableSchema = schemaResolver.getTableAvroSchema();
-    return HoodieAvroUtils.getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields());
+    return addMetadataFields(getSchemaForFields(tableSchema, 
indexDefinition.getSourceFields()));
   }
 
   private static Path filePath(String basePath, String partition, String 
filename) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index f5cee136d2e..80f403ce777 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -736,6 +736,55 @@ public class TestHoodieTableFileSystemView extends 
HoodieCommonTestHarness {
     }
   }
 
+  @Test
+  public void testGetLatestFileSlicesIncludingInflight() throws Exception {
+    String partitionPath = "2016/05/01";
+    new File(basePath + "/" + partitionPath).mkdirs();
+    String fileId = UUID.randomUUID().toString();
+
+    String instantTime1 = "1";
+    String deltaInstantTime1 = "2";
+    String deltaInstantTime2 = "3";
+
+    String dataFileName = FSUtils.makeBaseFileName(instantTime1, 
TEST_WRITE_TOKEN, fileId, BASE_FILE_EXTENSION);
+    new File(basePath + "/" + partitionPath + "/" + 
dataFileName).createNewFile();
+    String fileName1 =
+        FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, 
deltaInstantTime1, 0, TEST_WRITE_TOKEN);
+    String fileName2 =
+        FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, 
deltaInstantTime2, 1, TEST_WRITE_TOKEN);
+    new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
+    new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
+    HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+    HoodieInstant instant1 = new HoodieInstant(true, 
HoodieTimeline.COMMIT_ACTION, instantTime1);
+    HoodieInstant deltaInstant2 = new HoodieInstant(true, 
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
+    HoodieInstant deltaInstant3 = new HoodieInstant(true, 
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
+
+    // just commit instant1 and deltaInstant2, keep deltaInstant3 inflight
+    saveAsComplete(commitTimeline, instant1, Option.empty());
+    saveAsComplete(commitTimeline, deltaInstant2, Option.empty());
+    refreshFsView();
+
+    // getLatestFileSlices should return just 1 log file due to deltaInstant2
+    
rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()).forEach(fs
 -> {
+      assertEquals(fs.getBaseInstantTime(), instantTime1);
+      assertTrue(fs.getBaseFile().isPresent());
+      assertEquals(fs.getBaseFile().get().getCommitTime(), instantTime1);
+      assertEquals(fs.getBaseFile().get().getFileId(), fileId);
+      assertEquals(fs.getBaseFile().get().getPath(), "file:" + basePath + "/" 
+ partitionPath + "/" + dataFileName);
+      assertEquals(fs.getLogFiles().count(), 1);
+    });
+
+    // getLatestFileSlicesIncludingInflight should return both the log files
+    
rtView.getLatestFileSlicesIncludingInflight(partitionPath).collect(Collectors.toList()).forEach(fs
 -> {
+      assertEquals(fs.getBaseInstantTime(), instantTime1);
+      assertTrue(fs.getBaseFile().isPresent());
+      assertEquals(fs.getBaseFile().get().getCommitTime(), instantTime1);
+      assertEquals(fs.getBaseFile().get().getFileId(), fileId);
+      assertEquals(fs.getBaseFile().get().getPath(), "file:" + basePath + "/" 
+ partitionPath + "/" + dataFileName);
+      assertEquals(fs.getLogFiles().count(), 2);
+    });
+  }
+
   /**
    * Helper method to test Views in the presence of concurrent compaction.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
index 1e2b8e0c35e..eb30e1387c1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
@@ -357,6 +357,32 @@ public class TestPriorityBasedFileSystemView {
     });
   }
 
+  @Test
+  public void testGetLatestFileSlicesIncludingInflight() {
+    Stream<FileSlice> actual;
+    Stream<FileSlice> expected = testFileSliceStream;
+    String partitionPath = "/table2";
+
+    
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+    actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+    assertEquals(expected, actual);
+
+    resetMocks();
+    
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenThrow(new 
RuntimeException());
+    
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+    actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+    assertEquals(expected, actual);
+
+    resetMocks();
+    
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+    actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+    assertEquals(expected, actual);
+
+    resetMocks();
+    
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenThrow(new
 RuntimeException());
+    assertThrows(RuntimeException.class, () -> 
fsView.getLatestFileSlicesIncludingInflight(partitionPath));
+  }
+
   @Test
   public void testGetLatestUnCompactedFileSlices() {
     Stream<FileSlice> actual;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 727daef0058..2a313f70461 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -253,6 +253,74 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test functional index update after initialization") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir(tmp => {
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""create table $tableName (
+            id int,
+            name string,
+            price double,
+            ts long
+            ) using hudi
+            options (
+            primaryKey ='id',
+            type = 'mor',
+            preCombineField = 'ts',
+            hoodie.metadata.record.index.enable = 'true',
+            hoodie.datasource.write.recordkey.field = 'id'
+            )
+            partitioned by(ts)
+            location '$basePath'""".stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '1970-01-01'")(
+          Seq(1, "a1"),
+          Seq(2, "a2"),
+          Seq(3, "a3")
+        )
+        // create functional index
+        var createIndexSql = s"create index idx_datestr on $tableName using 
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
+        spark.sql(createIndexSql)
+        var metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(basePath)
+          .setConf(spark.sessionState.newHadoopConf())
+          .build()
+        var functionalIndexMetadata = 
metaClient.getFunctionalIndexMetadata.get()
+        assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_idx_datestr", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+        
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+        assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
+
+        // do another insert after initializing the index
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
+        // check query result
+        checkAnswer(s"select id, name from $tableName where from_unixtime(ts, 
'yyyy-MM-dd') = '1970-04-26'")(
+          Seq(4, "a4")
+        )
+
+        // Verify one can create more than one functional index
+        createIndexSql = s"create index name_lower on $tableName using 
column_stats(ts) options(func='identity')"
+        spark.sql(createIndexSql)
+        metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(basePath)
+          .setConf(spark.sessionState.newHadoopConf())
+          .build()
+        functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get()
+        assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size())
+        assertEquals("func_index_name_lower", 
functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName)
+
+        // Ensure that both the indexes are tracked correctly in metadata 
partition config
+        val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions
+        assertTrue(mdtPartitions.contains("func_index_name_lower") && 
mdtPartitions.contains("func_index_idx_datestr"))
+      })
+    }
+  }
+
   private def assertTableIdentifier(catalogTable: CatalogTable,
                                     expectedDatabaseName: String,
                                     expectedTableName: String): Unit = {
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index f17c5624084..8ca50462cd2 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -340,6 +340,14 @@ public class RequestHandler {
       writeValueAsString(ctx, dtos);
     }, true));
 
+    
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INFLIGHT_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICES_INFLIGHT", 1);
+      List<FileSliceDTO> dtos = 
sliceHandler.getLatestFileSlicesIncludingInflight(
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+      writeValueAsString(ctx, dtos);
+    }, true));
+
     
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 391145c5cf8..8af86de7b08 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -90,6 +90,11 @@ public class FileSliceHandler extends Handler {
         .collect(Collectors.toList());
   }
 
+  public List<FileSliceDTO> getLatestFileSlicesIncludingInflight(String 
basePath, String partitionPath) {
+    return 
viewManager.getFileSystemView(basePath).getLatestFileSlicesIncludingInflight(partitionPath).map(FileSliceDTO::fromFileSlice)
+        .collect(Collectors.toList());
+  }
+
   public List<FileSliceDTO> getLatestFileSlicesStateless(String basePath, 
String partitionPath) {
     return 
viewManager.getFileSystemView(basePath).getLatestFileSlicesStateless(partitionPath).map(FileSliceDTO::fromFileSlice)
         .collect(Collectors.toList());

Reply via email to