This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new be7120d6d91 [HUDI-7480] Fix functional index and avoid multiple
initializations (#10860)
be7120d6d91 is described below
commit be7120d6d91c1371cb26924c94eb7b3647ff520e
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Apr 9 16:24:29 2024 +0530
[HUDI-7480] Fix functional index and avoid multiple initializations (#10860)
This PR fixes two issues:
- When a functional index is updated, it was getting re-initialized. This
was happening because metadata partition type only contains the functional
index partition prefix while hoodie.properties contains the full name. Due to
discrepancy, metadata writer thought there is anew index that needs ot be
initialized. This is fixed in `HoodieTableConfig#isMetadataPartitionAvailable.`
- When a functional index is updated, it fetches latest file slices,
including the pending ones, in data partition. There was a typo in building the
filesystem view. Instead of passing data meta client, metadata meta client was
passed.
Added a test which covers both these scenarios.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 24 +++++---
.../hudi/common/table/HoodieTableConfig.java | 8 ++-
.../hudi/common/table/HoodieTableMetaClient.java | 2 +-
.../table/view/AbstractTableFileSystemView.java | 15 ++++-
.../table/view/HoodieTableFileSystemView.java | 14 -----
.../table/view/PriorityBasedFileSystemView.java | 5 ++
.../view/RemoteHoodieTableFileSystemView.java | 13 +++++
.../common/table/view/TableFileSystemView.java | 8 +++
.../hudi/metadata/HoodieTableMetadataUtil.java | 5 +-
.../table/view/TestHoodieTableFileSystemView.java | 49 ++++++++++++++++
.../view/TestPriorityBasedFileSystemView.java | 26 +++++++++
.../hudi/command/index/TestFunctionalIndex.scala | 68 ++++++++++++++++++++++
.../hudi/timeline/service/RequestHandler.java | 8 +++
.../service/handlers/FileSliceHandler.java | 5 ++
14 files changed, 222 insertions(+), 28 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 99739947077..ca9c72ac601 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -97,7 +97,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
@@ -434,7 +433,12 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
break;
case FUNCTIONAL_INDEX:
- fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition();
+ Set<String> functionalIndexPartitionsToInit =
getFunctionalIndexPartitionsToInit();
+ if (functionalIndexPartitionsToInit.isEmpty()) {
+ continue;
+ }
+ ValidationUtils.checkState(functionalIndexPartitionsToInit.size()
== 1, "Only one functional index at a time is supported for now");
+ fileGroupCountAndRecordsPair =
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
break;
default:
throw new HoodieMetadataException("Unsupported MDT partition type:
" + partitionType);
@@ -520,9 +524,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
int
parallelism, Schema readerSchema,
SerializableConfiguration hadoopConf);
- private Pair<Integer, HoodieData<HoodieRecord>>
initializeFunctionalIndexPartition() throws Exception {
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFunctionalIndexPartition(String indexName) throws Exception {
HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
- String indexName =
dataWriteConfig.getFunctionalIndexConfig().getIndexName();
HoodieFunctionalIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexName);
// Collect the list of latest file slices present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
@@ -534,10 +537,17 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
int fileGroupCount =
dataWriteConfig.getMetadataConfig().getFunctionalIndexFileGroupCount();
int parallelism = Math.min(partitionFileSlicePairs.size(),
dataWriteConfig.getMetadataConfig().getFunctionalIndexParallelism());
- Schema readerSchema =
addMetadataFields(getProjectedSchemaForFunctionalIndex(indexDefinition,
dataMetaClient), dataWriteConfig.allowOperationMetadataField());
+ Schema readerSchema =
getProjectedSchemaForFunctionalIndex(indexDefinition, dataMetaClient);
return Pair.of(fileGroupCount,
getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, hadoopConf));
}
+ private Set<String> getFunctionalIndexPartitionsToInit() {
+ Set<String> functionalIndexPartitions =
dataMetaClient.getFunctionalIndexMetadata().get().getIndexDefinitions().keySet();
+ Set<String> completedMetadataPartitions =
dataMetaClient.getTableConfig().getMetadataPartitions();
+ functionalIndexPartitions.removeAll(completedMetadataPartitions);
+ return functionalIndexPartitions;
+ }
+
private HoodieFunctionalIndexDefinition getFunctionalIndexDefinition(String
indexName) {
Option<HoodieFunctionalIndexMetadata> functionalIndexMetadata =
dataMetaClient.getFunctionalIndexMetadata();
if (functionalIndexMetadata.isPresent()) {
@@ -991,9 +1001,9 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
HoodieFunctionalIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
- HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(dataMetaClient);
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value)
-> {
- List<FileSlice> fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), dataPartition);
+ List<FileSlice> fileSlices =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.ofNullable(fsView), dataPartition);
fileSlices.forEach(fileSlice -> {
// Filter log files for the instant time and add to this partition
fileSlice pairs
List<HoodieLogFile> logFilesForInstant = fileSlice.getLogFiles()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 3cac820d4a8..9b0b294d3e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -737,11 +737,13 @@ public class HoodieTableConfig extends HoodieConfig {
/**
* Checks if metadata table is enabled and the specified partition has been
initialized.
*
- * @param partition The partition to check
+ * @param metadataPartitionType The metadata table partition type to check
* @returns true if the specific partition has been initialized, else
returns false.
*/
- public boolean isMetadataPartitionAvailable(MetadataPartitionType partition)
{
- return getMetadataPartitions().contains(partition.getPartitionPath());
+ public boolean isMetadataPartitionAvailable(MetadataPartitionType
metadataPartitionType) {
+ /*return getMetadataPartitions().stream().anyMatch(metadataPartition ->
+ metadataPartition.equals(metadataPartitionType.getPartitionPath()) ||
(FUNCTIONAL_INDEX.equals(metadataPartitionType) &&
metadataPartition.startsWith(FUNCTIONAL_INDEX.getPartitionPath())));*/
+ return
getMetadataPartitions().contains(metadataPartitionType.getPartitionPath());
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 6468d165568..f9f23bbbd2d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -224,7 +224,7 @@ public class HoodieTableMetaClient implements Serializable {
* Returns Option of {@link HoodieFunctionalIndexMetadata} from index
definition file if present, else returns empty Option.
*/
public Option<HoodieFunctionalIndexMetadata> getFunctionalIndexMetadata() {
- if (functionalIndexMetadata.isPresent()) {
+ if (functionalIndexMetadata.isPresent() &&
!functionalIndexMetadata.get().getIndexDefinitions().isEmpty()) {
return functionalIndexMetadata;
}
if (tableConfig.getIndexDefinitionPath().isPresent() &&
StringUtils.nonEmpty(tableConfig.getIndexDefinitionPath().get())) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index cdac0eeeb20..c1d60e1a33f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -451,7 +451,7 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
*
* @param partition partition to be loaded if not present
*/
- private void ensurePartitionLoadedCorrectly(String partition) {
+ protected void ensurePartitionLoadedCorrectly(String partition) {
ValidationUtils.checkArgument(!isClosed(), "View is already closed");
@@ -851,6 +851,19 @@ public abstract class AbstractTableFileSystemView
implements SyncableFileSystemV
}
}
+ public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath) {
+ try {
+ readLock.lock();
+ ensurePartitionLoadedCorrectly(partitionPath);
+ return fetchAllStoredFileGroups(partitionPath)
+ .map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
+ .filter(Option::isPresent)
+ .map(Option::get);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
@Override
public final Stream<FileSlice> getLatestFileSlicesStateless(String
partitionStr) {
String partition = formatPartitionKey(partitionStr);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 427258ff596..144e2bf8073 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -20,7 +20,6 @@ package org.apache.hudi.common.table.view;
import org.apache.hudi.common.model.BootstrapBaseFileMapping;
import org.apache.hudi.common.model.CompactionOperation;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -421,19 +420,6 @@ public class HoodieTableFileSystemView extends
IncrementalTimelineSyncFileSystem
return Option.ofNullable(fgIdToReplaceInstants.get(fileGroupId));
}
- /**
- * Get the latest file slices for a given partition including the inflight
ones.
- *
- * @param partitionPath The partition path of interest
- * @return Stream of latest {@link FileSlice} in the partition path.
- */
- public Stream<FileSlice> fetchLatestFileSlicesIncludingInflight(String
partitionPath) {
- return fetchAllStoredFileGroups(partitionPath)
- .map(HoodieFileGroup::getLatestFileSlicesIncludingInflight)
- .filter(Option::isPresent)
- .map(Option::get);
- }
-
@Override
public void close() {
super.close();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 1e4b1852d1b..8cfd6d64713 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -203,6 +203,11 @@ public class PriorityBasedFileSystemView implements
SyncableFileSystemView, Seri
return execute(partitionPath, preferredView::getLatestFileSlices,
secondaryView::getLatestFileSlices);
}
+ @Override
+ public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath) {
+ return execute(partitionPath,
preferredView::getLatestFileSlicesIncludingInflight,
secondaryView::getLatestFileSlicesIncludingInflight);
+ }
+
@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
return execute(partitionPath, preferredView::getLatestFileSlicesStateless,
secondaryView::getLatestFileSlicesStateless);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 61c90c6eb02..94f73e70c79 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -68,6 +68,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
private static final String BASE_URL = "/v1/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
+ public static final String LATEST_PARTITION_SLICES_INFLIGHT_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/inflight/");
public static final String LATEST_PARTITION_SLICES_STATELESS_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
public static final String LATEST_PARTITION_SLICE_URL =
String.format("%s/%s", BASE_URL, "slices/file/latest/");
public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
@@ -338,6 +339,18 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath) {
+ Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ try {
+ List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICES_INFLIGHT_URL, paramsMap,
+ FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
+ return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 87b3db142e6..76a928a0b5b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -107,6 +107,14 @@ public interface TableFileSystemView {
*/
Stream<FileSlice> getLatestFileSlices(String partitionPath);
+ /**
+ * Get the latest file slices for a given partition including the inflight
ones.
+ *
+ * @param partitionPath The partition path of interest
+ * @return Stream of latest {@link FileSlice} in the partition path.
+ */
+ Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath);
+
/**
* Stream all the latest file slices in the given partition
* without caching the file group mappings.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e81cd63f66e..b67faddc5fb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -125,6 +125,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
@@ -1081,7 +1082,7 @@ public class HoodieTableMetadataUtil {
Option<HoodieTableFileSystemView> fileSystemView,
String partition) {
HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() ->
getFileSystemView(metaClient));
- Stream<FileSlice> fileSliceStream =
fsView.fetchLatestFileSlicesIncludingInflight(partition);
+ Stream<FileSlice> fileSliceStream =
fsView.getLatestFileSlicesIncludingInflight(partition);
return fileSliceStream
.sorted(Comparator.comparing(FileSlice::getFileId))
.collect(Collectors.toList());
@@ -1919,7 +1920,7 @@ public class HoodieTableMetadataUtil {
public static Schema
getProjectedSchemaForFunctionalIndex(HoodieFunctionalIndexDefinition
indexDefinition, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
Schema tableSchema = schemaResolver.getTableAvroSchema();
- return HoodieAvroUtils.getSchemaForFields(tableSchema,
indexDefinition.getSourceFields());
+ return addMetadataFields(getSchemaForFields(tableSchema,
indexDefinition.getSourceFields()));
}
private static Path filePath(String basePath, String partition, String
filename) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index f5cee136d2e..80f403ce777 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -736,6 +736,55 @@ public class TestHoodieTableFileSystemView extends
HoodieCommonTestHarness {
}
}
+ @Test
+ public void testGetLatestFileSlicesIncludingInflight() throws Exception {
+ String partitionPath = "2016/05/01";
+ new File(basePath + "/" + partitionPath).mkdirs();
+ String fileId = UUID.randomUUID().toString();
+
+ String instantTime1 = "1";
+ String deltaInstantTime1 = "2";
+ String deltaInstantTime2 = "3";
+
+ String dataFileName = FSUtils.makeBaseFileName(instantTime1,
TEST_WRITE_TOKEN, fileId, BASE_FILE_EXTENSION);
+ new File(basePath + "/" + partitionPath + "/" +
dataFileName).createNewFile();
+ String fileName1 =
+ FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
deltaInstantTime1, 0, TEST_WRITE_TOKEN);
+ String fileName2 =
+ FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
deltaInstantTime2, 1, TEST_WRITE_TOKEN);
+ new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
+ new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
+ HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
+ HoodieInstant instant1 = new HoodieInstant(true,
HoodieTimeline.COMMIT_ACTION, instantTime1);
+ HoodieInstant deltaInstant2 = new HoodieInstant(true,
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime1);
+ HoodieInstant deltaInstant3 = new HoodieInstant(true,
HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime2);
+
+ // just commit instant1 and deltaInstant2, keep deltaInstant3 inflight
+ saveAsComplete(commitTimeline, instant1, Option.empty());
+ saveAsComplete(commitTimeline, deltaInstant2, Option.empty());
+ refreshFsView();
+
+ // getLatestFileSlices should return just 1 log file due to deltaInstant2
+
rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList()).forEach(fs
-> {
+ assertEquals(fs.getBaseInstantTime(), instantTime1);
+ assertTrue(fs.getBaseFile().isPresent());
+ assertEquals(fs.getBaseFile().get().getCommitTime(), instantTime1);
+ assertEquals(fs.getBaseFile().get().getFileId(), fileId);
+ assertEquals(fs.getBaseFile().get().getPath(), "file:" + basePath + "/"
+ partitionPath + "/" + dataFileName);
+ assertEquals(fs.getLogFiles().count(), 1);
+ });
+
+ // getLatestFileSlicesIncludingInflight should return both the log files
+
rtView.getLatestFileSlicesIncludingInflight(partitionPath).collect(Collectors.toList()).forEach(fs
-> {
+ assertEquals(fs.getBaseInstantTime(), instantTime1);
+ assertTrue(fs.getBaseFile().isPresent());
+ assertEquals(fs.getBaseFile().get().getCommitTime(), instantTime1);
+ assertEquals(fs.getBaseFile().get().getFileId(), fileId);
+ assertEquals(fs.getBaseFile().get().getPath(), "file:" + basePath + "/"
+ partitionPath + "/" + dataFileName);
+ assertEquals(fs.getLogFiles().count(), 2);
+ });
+ }
+
/**
* Helper method to test Views in the presence of concurrent compaction.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
index 1e2b8e0c35e..eb30e1387c1 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java
@@ -357,6 +357,32 @@ public class TestPriorityBasedFileSystemView {
});
}
+ @Test
+ public void testGetLatestFileSlicesIncludingInflight() {
+ Stream<FileSlice> actual;
+ Stream<FileSlice> expected = testFileSliceStream;
+ String partitionPath = "/table2";
+
+
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+ actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+ assertEquals(expected, actual);
+
+ resetMocks();
+
when(primary.getLatestFileSlicesIncludingInflight(partitionPath)).thenThrow(new
RuntimeException());
+
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+ actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+ assertEquals(expected, actual);
+
+ resetMocks();
+
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenReturn(testFileSliceStream);
+ actual = fsView.getLatestFileSlicesIncludingInflight(partitionPath);
+ assertEquals(expected, actual);
+
+ resetMocks();
+
when(secondary.getLatestFileSlicesIncludingInflight(partitionPath)).thenThrow(new
RuntimeException());
+ assertThrows(RuntimeException.class, () ->
fsView.getLatestFileSlicesIncludingInflight(partitionPath));
+ }
+
@Test
public void testGetLatestUnCompactedFileSlices() {
Stream<FileSlice> actual;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
index 727daef0058..2a313f70461 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestFunctionalIndex.scala
@@ -253,6 +253,74 @@ class TestFunctionalIndex extends HoodieSparkSqlTestBase {
}
}
+ test("Test functional index update after initialization") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ withTempDir(tmp => {
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""create table $tableName (
+ id int,
+ name string,
+ price double,
+ ts long
+ ) using hudi
+ options (
+ primaryKey ='id',
+ type = 'mor',
+ preCombineField = 'ts',
+ hoodie.metadata.record.index.enable = 'true',
+ hoodie.datasource.write.recordkey.field = 'id'
+ )
+ partitioned by(ts)
+ location '$basePath'""".stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+ checkAnswer(s"select id, name from $tableName where from_unixtime(ts,
'yyyy-MM-dd') = '1970-01-01'")(
+ Seq(1, "a1"),
+ Seq(2, "a2"),
+ Seq(3, "a3")
+ )
+ // create functional index
+ var createIndexSql = s"create index idx_datestr on $tableName using
column_stats(ts) options(func='from_unixtime', format='yyyy-MM-dd')"
+ spark.sql(createIndexSql)
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ var functionalIndexMetadata =
metaClient.getFunctionalIndexMetadata.get()
+ assertEquals(1, functionalIndexMetadata.getIndexDefinitions.size())
+ assertEquals("func_index_idx_datestr",
functionalIndexMetadata.getIndexDefinitions.get("func_index_idx_datestr").getIndexName)
+
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains("func_index_idx_datestr"))
+ assertTrue(metaClient.getFunctionalIndexMetadata.isPresent)
+
+ // do another insert after initializing the index
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 10000000)")
+ // check query result
+ checkAnswer(s"select id, name from $tableName where from_unixtime(ts,
'yyyy-MM-dd') = '1970-04-26'")(
+ Seq(4, "a4")
+ )
+
+ // Verify one can create more than one functional index
+ createIndexSql = s"create index name_lower on $tableName using
column_stats(ts) options(func='identity')"
+ spark.sql(createIndexSql)
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf())
+ .build()
+ functionalIndexMetadata = metaClient.getFunctionalIndexMetadata.get()
+ assertEquals(2, functionalIndexMetadata.getIndexDefinitions.size())
+ assertEquals("func_index_name_lower",
functionalIndexMetadata.getIndexDefinitions.get("func_index_name_lower").getIndexName)
+
+ // Ensure that both the indexes are tracked correctly in metadata
partition config
+ val mdtPartitions = metaClient.getTableConfig.getMetadataPartitions
+ assertTrue(mdtPartitions.contains("func_index_name_lower") &&
mdtPartitions.contains("func_index_idx_datestr"))
+ })
+ }
+ }
+
private def assertTableIdentifier(catalogTable: CatalogTable,
expectedDatabaseName: String,
expectedTableName: String): Unit = {
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index f17c5624084..8ca50462cd2 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -340,6 +340,14 @@ public class RequestHandler {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INFLIGHT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICES_INFLIGHT", 1);
+ List<FileSliceDTO> dtos =
sliceHandler.getLatestFileSlicesIncludingInflight(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ writeValueAsString(ctx, dtos);
+ }, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
index 391145c5cf8..8af86de7b08 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java
@@ -90,6 +90,11 @@ public class FileSliceHandler extends Handler {
.collect(Collectors.toList());
}
+ public List<FileSliceDTO> getLatestFileSlicesIncludingInflight(String
basePath, String partitionPath) {
+ return
viewManager.getFileSystemView(basePath).getLatestFileSlicesIncludingInflight(partitionPath).map(FileSliceDTO::fromFileSlice)
+ .collect(Collectors.toList());
+ }
+
public List<FileSliceDTO> getLatestFileSlicesStateless(String basePath,
String partitionPath) {
return
viewManager.getFileSystemView(basePath).getLatestFileSlicesStateless(partitionPath).map(FileSliceDTO::fromFileSlice)
.collect(Collectors.toList());