CTTY commented on code in PR #12407:
URL: https://github.com/apache/hudi/pull/12407#discussion_r1868589645
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
Review Comment:
Is this a path or just the file name? Would appreciate if we can add an
example of the exact string that gets passed in here.
Also could you help me understand why `instantTime` is needed in this API by
adding it to the comment?
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
Review Comment:
Where can this API be useful?
I think we need to think twice about it. Adding this API means Hudi should
be able to infer relative partition locations solely based on partition path.
And for `Object Store Strategy` this means all the data under the same
partition would be stored under the same physical location again, because we
can only calculate the hash based on two arguments that got passed in.
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -198,46 +419,68 @@ for metadata table to be populated.
4. If there is an error reading from Metadata table, we will not fall back
listing from file system.
+After enabling the Federated Storage Layout feature, under certain strategies
such as the "data cache layer,"
+data from different lake tables may be stored on different physical media,
resulting in different schemes.
+For example, cache layer data may be stored on hdfs://ns1/, while persistent
layer data is stored on hdfs://ns2/.
+In this case, we need to add a new field named "scheme" in MDT
HoodieMetadataFileInfo to store the scheme information for different files,
+which will be used for path restoration.
+
+```avro schema
+ {
+ "doc": "Contains information about partitions and files within the
dataset",
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ },
+ {
+ "name":"scheme",
Review Comment:
We should use `prefix` instead to avoid confusion. The "scheme" here can
also contain the root path/authority
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -178,6 +275,130 @@ The hashing function should be made user configurable for
use cases like bucketi
sub-partitioning/re-hash to reduce the number of hash prefixes. Having too
many unique hash prefixes
would make files too dispersed, and affect performance on other operations
such as listing.
+### Case2: Hudi Storage Cache Layer
+
+The cache layer of a lake table is a specific implementation scenario for
Federated Storage Layout in hudi tables.
+It divides the physical storage of the lake table into a high-performance
HDFS-based cache layer and a shared HDFS-based persistent layer.
+Hot data is first written to the cache layer and later moved to the persistent
layer through table services like Compaction and Clustering.
+This setup addresses the strong demands for performance and stability in
scenarios involving massive data ingestion into the lake.
+It is important to note that once data is written to the cache layer and
committed, it becomes visible to downstream processes, regardless of when
+the "relocation work" starts or finishes. Additionally, since the data
relocation from the cache layer to the persistent layer leverages the lake
table's
+own Compaction and Clustering capabilities, this process adheres to the lake
table's commit mechanism and MVCC snapshot isolation design. Therefore,
+after enabling the cache layer, the lake table maintains atomicity,
transactional guarantees, and Exactly Once semantics.
+
+Below is a comparison between the normal read/write process of a lake table
and the process after enabling the lake table cache layer.
+Green arrows indicate data reads, and red arrows indicate data writes. Before
enabling the cache layer, the compute engine directly writes
+data to the shared HDFS and commits, including Parquet base files, log files,
and metadata files. The compute engine queries the lake table
+by directly reading data from the shared HDFS. Additionally, the lake table
services for Clustering and Compaction also directly query data from
+the shared HDFS, process it, and write it back to the shared HDFS. After
enabling the lake table cache layer, the compute engine first writes hot
+data to the high-performance HDFS and commits, including Parquet files and log
files. During queries, a unified logical view of both the cache layer
+and the persistent layer is constructed to meet query demands. The Clustering
and Compaction table services, while performing regular lake table file
+organization, also facilitate data relocation from the cache layer to the
persistent layer. Notably, regardless of when Clustering and Compaction jobs
+start or finish, the data visible to downstream processes is always complete
and timely.
+
+Original Read/Write workflow
+
+
+Read/Write workflow with hudi cache layer enabled
+
+
+#### HoodieCacheLayerStorageStrategy
+Based on the HoodieActiveTimeline and the current write instant, determine the
specific write path. For common commit operations
+in COW (Copy on Write) tables and delta commit operations in MOR (Merge on
Read) tables, we generate a CacheLayer-related Storage
+Path to write/read. This type of I/O is targeted at the cache layer.
+
+As for commit action in mor table and replace commit in cow table, we will
generate a persistent Storage Path which will let
+Compaction/Clustering do the data migration works from cache layer to
persistent layer
+
+Note: It is required that Clustering is enabled for COW tables and Compaction
is enabled for MOR tables; otherwise, there is a
+risk of storage overflow in the cache layer.
+
+```java
+/**
+ * When using Storage Cache Layer make sure that table service is enabled :
+ * 1. MOR + Upsert + Compaction
+ * 2. COW + Insert + Clustering
+ */
+public class HoodieCacheLayerStorageStrategy extends
HoodieDefaultStorageStrategy {
Review Comment:
We should omit the implementation details to focus on discussing the
abstraction API in this RFC
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -198,46 +419,68 @@ for metadata table to be populated.
4. If there is an error reading from Metadata table, we will not fall back
listing from file system.
+After enabling the Federated Storage Layout feature, under certain strategies
such as the "data cache layer,"
+data from different lake tables may be stored on different physical media,
resulting in different schemes.
+For example, cache layer data may be stored on hdfs://ns1/, while persistent
layer data is stored on hdfs://ns2/.
+In this case, we need to add a new field named "scheme" in MDT
HoodieMetadataFileInfo to store the scheme information for different files,
+which will be used for path restoration.
+
+```avro schema
+ {
+ "doc": "Contains information about partitions and files within the
dataset",
+ "name": "filesystemMetadata",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "HoodieMetadataFileInfo",
+ "fields": [
+ {
+ "name": "size",
+ "type": "long",
+ "doc": "Size of the file"
+ },
+ {
+ "name": "isDeleted",
+ "type": "boolean",
+ "doc": "True if this file has been deleted"
+ },
+ {
+ "name":"scheme",
+ "type": ["null","string"],
+ "default":null
+ }
+ ]
+ }
+ }
+ ]
+ }
+```
+
+Note: For lake tables that do not have the Federated Storage Layout enabled,
the value of this "scheme" field will be null.
+
### Integration
This section mainly describes how storage strategy is integrated with other
components and how read/write
-would look like from Hudi side with object storage layout.
-
-We propose integrating the storage strategy at the filesystem level,
specifically within `HoodieWrapperFileSystem`.
-This way, only file read/write operations undergo path conversion and we can
limit the usage of
-storage strategy to only filesystem level so other upper-level components
don't need to be aware of physical paths.
-
-This also mandates that `HoodieWrapperFileSystem` is the filesystem of choice
for all upper-level Hudi components.
-Getting filesystem from `Path` or such won't be allowed anymore as using raw
filesystem may not reach
-to physical locations without storage strategy. Hudi components can simply
call `HoodieMetaClient#getFs`
-to get `HoodieWrapperFileSystem`, and this needs to be the only allowed way
for any filesystem-related operation.
-The only exception is when we need to interact with metadata that's still
stored under the original table path,
-and we should call `HoodieMetaClient#getRawFs` in this case so
`HoodieMetaClient` can still be the single entry
-for getting filesystem.
-
-
-
-When conducting a read operation, Hudi would:
-1. Access filesystem view, `HoodieMetadataFileSystemView` specifically
-2. Scan metadata table via filesystem view to compose `HoodieMetadataPayload`
-3. Call `HoodieMetadataPayload#getFileStatuses` and employ
`HoodieWrapperFileSystem` to get
-file statuses with physical locations
-
-This flow can be concluded in the chart below.
-
-
-
-#### Considerations
-- Path conversion happens on the fly when reading/writing files. This saves
Hudi from storing physical locations,
-and adds the cost of hashing, but the performance burden should be negligible.
-- Since table path and data path will most likely have different top-level
folders/authorities,
-`HoodieWrapperFileSystem` should maintain at least two `FileSystem` objects:
one to access table path and another
-to access storage path. `HoodieWrapperFileSystem` should intelligently tell if
it needs
-to convert the path by checking the path on the fly.
-- When using Hudi file reader/writer implementation, we will need to pass
`HoodieWrapperFileSystem` down
-to parent reader. For instance, when using `HoodieAvroHFileReader`, we will
need to pass `HoodieWrapperFileSystem`
-to `HFile.Reader` so it can have access to storage strategy. If reader/writer
doesn't take filesystem
-directly (e.g. `ParquetFileReader` only takes `Configuration` and `Path` for
reading), then we will
-need to register `HoodieWrapperFileSystem` to `Configuration` so it can be
initialized/used later.
+would look like from Hudi side with Federated Storage Layout.
+
+We already have the abstractions HoodieStorage, StoragePath, and
HoodieWrapperFileSystem. Here, we need to add a
Review Comment:
It would be cleaner if we could not use `HoodieWrapperFileSystem` at all and
concentrate all the logic to `HoodieStorage` or `StorageStrategy`
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -18,33 +18,68 @@
# RFC-60: Federated Storage Layout
## Proposers
+- @zhangyue19921010
+- @CTTY
- @umehrot2
## Approvers
- @vinoth
- @shivnarayan
+- @yihua
## Status
JIRA:
[https://issues.apache.org/jira/browse/HUDI-3625](https://issues.apache.org/jira/browse/HUDI-3625)
## Abstract
+In this RFC, we will support the Federated Storage Layout for Hudi tables,
enabling Hudi to support multiple pluggable physical
+storage systems. By combining Hudi's own metadata, we can construct logical
table views and expose these views externally,
+making them transparent to the engine.
+
+
+After implementing the Hudi Federated Storage Layout, we can develop many
interesting new features for Hudi lake tables based on this, such as:
+
+### Object Store Optimized Layout
As you scale your Apache Hudi workloads over cloud object stores like Amazon
S3, there is potential of hitting request
throttling limits which in-turn impacts performance. In this RFC, we are
proposing to support an alternate storage
layout that is optimized for Amazon S3 and other cloud object stores, which
helps achieve maximum throughput and
significantly reduce throttling.
+
+
+### Hudi Storage Cache Layer
+The Hudi lake table data cache layer involves dividing the lake table physical
storage into a high-performance HDFS-based data
+cache layer and a shared HDFS-based data persistence layer. Hot data is
initially written to the cache layer and later moved to
+the persistence layer through table services like Compaction and Clustering.
This approach meets the strong demands for performance
+and stability in scenarios involving massive data ingestion into the lake. It
is important to note that once data is written to the
+cache layer and committed, it becomes visible to downstream consumers,
regardless of when the subsequent "moving operations" start
+or finish, ensuring data visibility and timeliness are unaffected.
Additionally, since the data movement from the cache layer to
+the persistence layer leverages the lake table's own Compaction and Clustering
table service capabilities, this process adheres to
+the lake table's commit mechanism and MVCC snapshot isolation design.
Therefore, with the data cache layer enabled, the lake table
+maintains atomicity, transactional guarantees, and Exactly Once semantics.
+
+
+
+### Hudi Table Second-Level Latency
+When data is ingested into the lake, it is first written to data files and
metadata, and only becomes visible after the transaction
+is successfully committed. Since writing data files is relatively slow, it
typically involves minute-level commits. Here, the capabilities
+of the Federated Storage Layout can be extended by writing data to high-speed
storage systems like Kafka, HBase, or Redis, and recording
+metadata such as offsets or keys in the lake table's metadata. During reads,
the content from both persistent storage and high-speed storage
+can be combined, enabling Hudi to achieve second-level latency.
+
In addition, we are proposing an interface that would allow users to implement
their own custom strategy to allow them
to distribute the data files across cloud stores, hdfs or on prem based on
their specific use-cases.
+
Review Comment:
nit: The image size is much smaller comparing to other images, can we fix
this
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+ /**
+ * Return RelativePath base on path and locations.
+ *
+ * @param path
+ * @return relative path
+ */
+ String getRelativePath(Path path);
+}
+```
+
+```java
+public class HoodieStorageStrategyFactory {
Review Comment:
We can omit the factory class in RFC to focus on the abstraction
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+ /**
+ * Return RelativePath base on path and locations.
+ *
+ * @param path
+ * @return relative path
+ */
+ String getRelativePath(Path path);
+}
+```
+
+```java
+public class HoodieStorageStrategyFactory {
+ private HoodieStorageStrategyFactory() {
+ }
+
+ public static HoodieStorageStrategy getInstant(HoodieTableMetaClient
metaClient, Boolean reset) {
Review Comment:
nit: Do you mean `getInstance`? Or we can just say `getStorageStrategy`.
`reset` can be replaced with `useDefault`
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -18,33 +18,68 @@
# RFC-60: Federated Storage Layout
## Proposers
+- @zhangyue19921010
+- @CTTY
- @umehrot2
## Approvers
- @vinoth
- @shivnarayan
+- @yihua
## Status
JIRA:
[https://issues.apache.org/jira/browse/HUDI-3625](https://issues.apache.org/jira/browse/HUDI-3625)
## Abstract
+In this RFC, we will support the Federated Storage Layout for Hudi tables,
enabling Hudi to support multiple pluggable physical
+storage systems. By combining Hudi's own metadata, we can construct logical
table views and expose these views externally,
+making them transparent to the engine.
+
+
+After implementing the Hudi Federated Storage Layout, we can develop many
interesting new features for Hudi lake tables based on this, such as:
+
+### Object Store Optimized Layout
As you scale your Apache Hudi workloads over cloud object stores like Amazon
S3, there is potential of hitting request
throttling limits which in-turn impacts performance. In this RFC, we are
proposing to support an alternate storage
layout that is optimized for Amazon S3 and other cloud object stores, which
helps achieve maximum throughput and
significantly reduce throttling.
+
+
+### Hudi Storage Cache Layer
+The Hudi lake table data cache layer involves dividing the lake table physical
storage into a high-performance HDFS-based data
+cache layer and a shared HDFS-based data persistence layer. Hot data is
initially written to the cache layer and later moved to
+the persistence layer through table services like Compaction and Clustering.
This approach meets the strong demands for performance
+and stability in scenarios involving massive data ingestion into the lake. It
is important to note that once data is written to the
+cache layer and committed, it becomes visible to downstream consumers,
regardless of when the subsequent "moving operations" start
+or finish, ensuring data visibility and timeliness are unaffected.
Additionally, since the data movement from the cache layer to
+the persistence layer leverages the lake table's own Compaction and Clustering
table service capabilities, this process adheres to
+the lake table's commit mechanism and MVCC snapshot isolation design.
Therefore, with the data cache layer enabled, the lake table
+maintains atomicity, transactional guarantees, and Exactly Once semantics.
+
+
+
+### Hudi Table Second-Level Latency
Review Comment:
CMIIW: I think the low-level logic of this case is very similar to the cache
layer, which is splitting storage into a cache level and a persistent level.
The only difference is that in the third case, cache layer's data would be
visible to consumers only after being persisted.
If so, can we combining these two cases into one by abstracting them more?
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+ /**
+ * Return RelativePath base on path and locations.
+ *
+ * @param path
+ * @return relative path
+ */
+ String getRelativePath(Path path);
Review Comment:
Change to `StoragePath`
Is the `path` here the same as the `path` in `storageLocation`?
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -178,6 +275,130 @@ The hashing function should be made user configurable for
use cases like bucketi
sub-partitioning/re-hash to reduce the number of hash prefixes. Having too
many unique hash prefixes
would make files too dispersed, and affect performance on other operations
such as listing.
+### Case2: Hudi Storage Cache Layer
+
+The cache layer of a lake table is a specific implementation scenario for
Federated Storage Layout in hudi tables.
+It divides the physical storage of the lake table into a high-performance
HDFS-based cache layer and a shared HDFS-based persistent layer.
+Hot data is first written to the cache layer and later moved to the persistent
layer through table services like Compaction and Clustering.
+This setup addresses the strong demands for performance and stability in
scenarios involving massive data ingestion into the lake.
+It is important to note that once data is written to the cache layer and
committed, it becomes visible to downstream processes, regardless of when
+the "relocation work" starts or finishes. Additionally, since the data
relocation from the cache layer to the persistent layer leverages the lake
table's
+own Compaction and Clustering capabilities, this process adheres to the lake
table's commit mechanism and MVCC snapshot isolation design. Therefore,
+after enabling the cache layer, the lake table maintains atomicity,
transactional guarantees, and Exactly Once semantics.
+
+Below is a comparison between the normal read/write process of a lake table
and the process after enabling the lake table cache layer.
+Green arrows indicate data reads, and red arrows indicate data writes. Before
enabling the cache layer, the compute engine directly writes
+data to the shared HDFS and commits, including Parquet base files, log files,
and metadata files. The compute engine queries the lake table
+by directly reading data from the shared HDFS. Additionally, the lake table
services for Clustering and Compaction also directly query data from
+the shared HDFS, process it, and write it back to the shared HDFS. After
enabling the lake table cache layer, the compute engine first writes hot
+data to the high-performance HDFS and commits, including Parquet files and log
files. During queries, a unified logical view of both the cache layer
+and the persistent layer is constructed to meet query demands. The Clustering
and Compaction table services, while performing regular lake table file
+organization, also facilitate data relocation from the cache layer to the
persistent layer. Notably, regardless of when Clustering and Compaction jobs
+start or finish, the data visible to downstream processes is always complete
and timely.
+
+Original Read/Write workflow
+
+
+Read/Write workflow with hudi cache layer enabled
+
+
+#### HoodieCacheLayerStorageStrategy
+Based on the HoodieActiveTimeline and the current write instant, determine the
specific write path. For common commit operations
+in COW (Copy on Write) tables and delta commit operations in MOR (Merge on
Read) tables, we generate a CacheLayer-related Storage
+Path to write/read. This type of I/O is targeted at the cache layer.
+
+As for commit action in mor table and replace commit in cow table, we will
generate a persistent Storage Path which will let
+Compaction/Clustering do the data migration works from cache layer to
persistent layer
+
+Note: It is required that Clustering is enabled for COW tables and Compaction
is enabled for MOR tables; otherwise, there is a
+risk of storage overflow in the cache layer.
+
+```java
+/**
+ * When using Storage Cache Layer make sure that table service is enabled :
+ * 1. MOR + Upsert + Compaction
+ * 2. COW + Insert + Clustering
+ */
+public class HoodieCacheLayerStorageStrategy extends
HoodieDefaultStorageStrategy {
+
+ private HoodieTableType tableType;
+ private String hoodieStorageStrategyModifyTime;
Review Comment:
Is this going to be updated often?
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+ /**
+ * Return RelativePath base on path and locations.
+ *
+ * @param path
+ * @return relative path
+ */
+ String getRelativePath(Path path);
+}
+```
+
+```java
+public class HoodieStorageStrategyFactory {
+ private HoodieStorageStrategyFactory() {
+ }
+
+ public static HoodieStorageStrategy getInstant(HoodieTableMetaClient
metaClient, Boolean reset) {
+ HoodieTableConfig config = metaClient.getTableConfig();
+ if (reset) {
+ return getInstant(HoodieStorageStrategyType.DEFAULT.value,
metaClient.getBasePath(), config.getStoragePath(), metaClient);
+ }
+ return getInstant(config.getStorageStrategy(), metaClient.getBasePath(),
config.getStoragePath(), metaClient);
+ }
+
+ public static HoodieStorageStrategy getInstant(HoodieTableMetaClient
metaClient) {
+ return getInstant(metaClient, false);
+ }
+
+ /**
+ * Just for HoodieParquetInputFormatBase
+ * @param config
+ * @param basePath
+ * @return
*/
- String storageLocation(String partitionPath, String fileId);
+ public static HoodieStorageStrategy getInstant(HoodieTableConfig config,
String basePath, Boolean reset) {
+ if (reset) {
+ return getInstant(HoodieStorageStrategyType.DEFAULT.value, basePath,
config.getStoragePath(), null);
+ }
+ return getInstant(config.getStorageStrategy(), basePath,
config.getStoragePath(), null);
+ }
+
+ private static HoodieStorageStrategy getInstant(
+ String storageStrategyClass,
+ String basePath,
+ String storagePath, HoodieTableMetaClient metaClient) {
+ return (HoodieStorageStrategy)
ReflectionUtils.loadClass(storageStrategyClass,
+ basePath, storagePath, metaClient == null ? Option.empty() :
Option.of(metaClient));
+ }
}
```
-### Generating File Paths for Object Store Optimized Layout
+```java
+public enum HoodieStorageStrategyType {
Review Comment:
Same here, we can omit this class to focus on the API abstraction
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -84,23 +119,85 @@ public interface HoodieStorageStrategy extends
Serializable {
/**
* Return a storage location for the given filename.
*
- * @param fileId data file ID
+ * @param path fileName
* @return a storage location string for a data file
*/
- String storageLocation(String fileId);
+ StoragePath storageLocation(String path, String instantTime);
/**
- * Return a storage location for the given partition and filename.
+ * Return all possible StoragePaths
*
- * @param partitionPath partition path for the file
- * @param fileId data file ID
- * @return a storage location string for a data file
+ * @param partitionPath
+ * @param checkExist check if StoragePath is truly existed or not.
+ * @return a st of storage partition path
+ */
+ Set<StoragePath> getAllLocations(String partitionPath, boolean checkExist);
+
+ /**
+ * Return RelativePath base on path and locations.
+ *
+ * @param path
+ * @return relative path
+ */
+ String getRelativePath(Path path);
+}
+```
+
+```java
+public class HoodieStorageStrategyFactory {
+ private HoodieStorageStrategyFactory() {
+ }
+
+ public static HoodieStorageStrategy getInstant(HoodieTableMetaClient
metaClient, Boolean reset) {
+ HoodieTableConfig config = metaClient.getTableConfig();
+ if (reset) {
+ return getInstant(HoodieStorageStrategyType.DEFAULT.value,
metaClient.getBasePath(), config.getStoragePath(), metaClient);
+ }
+ return getInstant(config.getStorageStrategy(), metaClient.getBasePath(),
config.getStoragePath(), metaClient);
+ }
+
+ public static HoodieStorageStrategy getInstant(HoodieTableMetaClient
metaClient) {
+ return getInstant(metaClient, false);
+ }
+
+ /**
+ * Just for HoodieParquetInputFormatBase
+ * @param config
+ * @param basePath
+ * @return
*/
- String storageLocation(String partitionPath, String fileId);
+ public static HoodieStorageStrategy getInstant(HoodieTableConfig config,
String basePath, Boolean reset) {
+ if (reset) {
+ return getInstant(HoodieStorageStrategyType.DEFAULT.value, basePath,
config.getStoragePath(), null);
+ }
+ return getInstant(config.getStorageStrategy(), basePath,
config.getStoragePath(), null);
+ }
+
+ private static HoodieStorageStrategy getInstant(
+ String storageStrategyClass,
+ String basePath,
+ String storagePath, HoodieTableMetaClient metaClient) {
+ return (HoodieStorageStrategy)
ReflectionUtils.loadClass(storageStrategyClass,
+ basePath, storagePath, metaClient == null ? Option.empty() :
Option.of(metaClient));
+ }
}
```
-### Generating File Paths for Object Store Optimized Layout
+```java
+public enum HoodieStorageStrategyType {
+ DEFAULT(HoodieDefaultStorageStrategy.class.getName()),
+ CACHE_LAYER(HoodieCacheLayerStorageStrategy.class.getName()),
+ OBJECT_STORAGE_STRATEGY(ObjectStorageStrategy.class.getName());
+
+ public final String value;
+
+ HoodieStorageStrategyType(String strategy) {
+ this.value = strategy;
+ }
+}
+```
+
+### Case1: Generating File Paths for Object Store Optimized Layout
Review Comment:
Could you help update the `Case 1` section as well? I think some of the
assumptions we made originally may not hold true anymore
##########
rfc/rfc-60/rfc-60.md:
##########
@@ -178,6 +275,130 @@ The hashing function should be made user configurable for
use cases like bucketi
sub-partitioning/re-hash to reduce the number of hash prefixes. Having too
many unique hash prefixes
would make files too dispersed, and affect performance on other operations
such as listing.
+### Case2: Hudi Storage Cache Layer
+
+The cache layer of a lake table is a specific implementation scenario for
Federated Storage Layout in hudi tables.
+It divides the physical storage of the lake table into a high-performance
HDFS-based cache layer and a shared HDFS-based persistent layer.
+Hot data is first written to the cache layer and later moved to the persistent
layer through table services like Compaction and Clustering.
+This setup addresses the strong demands for performance and stability in
scenarios involving massive data ingestion into the lake.
+It is important to note that once data is written to the cache layer and
committed, it becomes visible to downstream processes, regardless of when
+the "relocation work" starts or finishes. Additionally, since the data
relocation from the cache layer to the persistent layer leverages the lake
table's
+own Compaction and Clustering capabilities, this process adheres to the lake
table's commit mechanism and MVCC snapshot isolation design. Therefore,
+after enabling the cache layer, the lake table maintains atomicity,
transactional guarantees, and Exactly Once semantics.
+
+Below is a comparison between the normal read/write process of a lake table
and the process after enabling the lake table cache layer.
+Green arrows indicate data reads, and red arrows indicate data writes. Before
enabling the cache layer, the compute engine directly writes
+data to the shared HDFS and commits, including Parquet base files, log files,
and metadata files. The compute engine queries the lake table
+by directly reading data from the shared HDFS. Additionally, the lake table
services for Clustering and Compaction also directly query data from
+the shared HDFS, process it, and write it back to the shared HDFS. After
enabling the lake table cache layer, the compute engine first writes hot
+data to the high-performance HDFS and commits, including Parquet files and log
files. During queries, a unified logical view of both the cache layer
+and the persistent layer is constructed to meet query demands. The Clustering
and Compaction table services, while performing regular lake table file
+organization, also facilitate data relocation from the cache layer to the
persistent layer. Notably, regardless of when Clustering and Compaction jobs
+start or finish, the data visible to downstream processes is always complete
and timely.
+
+Original Read/Write workflow
+
+
+Read/Write workflow with hudi cache layer enabled
+
+
+#### HoodieCacheLayerStorageStrategy
+Based on the HoodieActiveTimeline and the current write instant, determine the
specific write path. For common commit operations
+in COW (Copy on Write) tables and delta commit operations in MOR (Merge on
Read) tables, we generate a CacheLayer-related Storage
+Path to write/read. This type of I/O is targeted at the cache layer.
+
+As for commit action in mor table and replace commit in cow table, we will
generate a persistent Storage Path which will let
+Compaction/Clustering do the data migration works from cache layer to
persistent layer
+
+Note: It is required that Clustering is enabled for COW tables and Compaction
is enabled for MOR tables; otherwise, there is a
+risk of storage overflow in the cache layer.
+
+```java
+/**
+ * When using Storage Cache Layer make sure that table service is enabled :
+ * 1. MOR + Upsert + Compaction
+ * 2. COW + Insert + Clustering
+ */
+public class HoodieCacheLayerStorageStrategy extends
HoodieDefaultStorageStrategy {
+
+ private HoodieTableType tableType;
+ private String hoodieStorageStrategyModifyTime;
+
+ /**
+ * Only support on Storage Path as cache layer
+ * @param basePath
+ * @param storagePath
+ * @param metaClient
+ */
+ public HoodieCacheLayerStorageStrategy(String basePath,
+ String storagePath,
+ Option<HoodieTableMetaClient>
metaClient) {
+ // init
+ }
+
+ /**
+ * Generate StoragePath based on active instant time
+ * for common write instant :
+ * 1. commit for cow table
+ * 2. delta commit for mor table
+ * We will generate a CacheLayer related Storage Path to write/read
+ *
+ * As for commit action in mor table and replace commit in cow table,
+ * we will generate a persistent Storage Path which will let
Compaction/Clustering
+ * do the data migration works from cache layer to persistent layer
+ */
+ @Override
+ public StoragePath storageLocation(String path, String instantTime) {
+ if (isCommonCommit(instantTime)) {
+ return FSUtils.getPartitionPath(storagePaths.get(0), path);
Review Comment:
I'm assuming `storagePaths` here is populated by `getAllLocations`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]