This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new aa0f91f32 [AMORO-2861]: Support display Hudi table metadata in Amoro 
Dashboard (#2877)
aa0f91f32 is described below

commit aa0f91f3267979415e86ce60edba9d5513dc8763
Author: baiyangtx <[email protected]>
AuthorDate: Mon Aug 19 14:08:31 2024 +0800

    [AMORO-2861]: Support display Hudi table metadata in Amoro Dashboard (#2877)
    
    * Hudi | Add format catalog and hadoop implements.
    
    * Refactor | Move hive related common class to core module.
    
    * Show Hudi tables in dashboard
    
    * show right table column field types.
    
    * show right table properties
    
    * partition keys && primary keys
    
    * hoodie table files
    
    * snapshot summary
    
    * snapshot details
    
    * optimizing infos
    
    * optimizing task infos
    
    * spotless apply
    
    * fix checkstyle
    
    * use amoro shade
    
    * fix complie problems
    
    * remove useless codes
    
    * support cluster instant
    
    * fix unit test problems
    
    * set hudi to provided
    
    * fix spark unit test problems.
    
    * fix checkstyle
    
    * fix reviews
    
    * fix conflict
    
    * fix checkstyle
    
    * dependency change
    
    * revert mock hive
    
    * no hudi format catalog in spark ut
    
    * revert optimizer config
    
    * test-jar
    
    * remove optimizer config
    
    * fix compile problem
    
    * add test-jar goal for jar-plugin
    
    * fix method
    
    ---------
    
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../src/assets/icons/svg/hudi.svg                  |  37 ++
 .../amoro-ams-dashboard/src/types/common.type.ts   |   1 +
 .../src/views/catalogs/Detail.vue                  |   5 +-
 amoro-ams/amoro-ams-server/pom.xml                 |  22 +
 .../amoro/server/catalog/CatalogBuilder.java       |   9 +-
 .../server/dashboard/FormatTableDescriptor.java    |   4 +-
 .../server/dashboard/HudiTableDescriptor.java      | 716 +++++++++++++++++++++
 .../dashboard/MixedAndIcebergTableDescriptor.java  |  34 +-
 .../server/dashboard/PaimonTableDescriptor.java    |  26 +-
 .../server/dashboard/ServerTableDescriptor.java    |   7 +-
 .../dashboard/controller/CatalogController.java    |   3 +
 .../dashboard/controller/TableController.java      |  17 +-
 .../dashboard/model/OptimizingProcessInfo.java     |   8 +-
 .../server/dashboard/model/OptimizingTaskInfo.java |   8 +-
 .../server/dashboard/model/ServerTableMeta.java    |   6 +-
 .../amoro/server/dashboard/model/TableMeta.java    |   4 +-
 .../amoro/server/dashboard/model/TableSummary.java |  74 +++
 .../apache/amoro/server/utils/HudiTableUtil.java   | 201 ++++++
 amoro-core/pom.xml                                 |  28 +
 .../org/apache/amoro/CommonUnifiedCatalog.java     |   9 +-
 .../main/java/org/apache/amoro/TableFormat.java    |   3 +-
 .../java/org/apache/amoro/data/DataFileType.java   |   3 +-
 .../amoro/formats/hudi/HudiCatalogFactory.java     |  56 ++
 .../amoro/formats/hudi/HudiHadoopCatalog.java      | 219 +++++++
 .../apache/amoro/formats/hudi/HudiHiveCatalog.java | 242 +++++++
 .../apache/amoro/formats/hudi/HudiSnapshot.java    | 103 +++
 .../org/apache/amoro/formats/hudi/HudiTable.java   | 281 ++++++++
 .../amoro/hive/AuthenticatedHiveClientPool.java    |   0
 .../apache/amoro/hive/CachedHiveClientPool.java    |   0
 .../main/java/org/apache/amoro/hive/HMSClient.java |   0
 .../java/org/apache/amoro/hive/HMSClientImpl.java  |   0
 .../java/org/apache/amoro/hive/HMSClientPool.java  |   0
 .../services/org.apache.amoro.FormatCatalogFactory |   3 +-
 .../ppr/PartitionExpressionForMetastore.java       |   0
 .../AuthorizationPreEventListener.java             |   0
 .../apache/amoro/spark/test/SparkTestContext.java  |  13 +-
 pom.xml                                            |  15 +
 37 files changed, 2093 insertions(+), 64 deletions(-)

diff --git a/amoro-ams/amoro-ams-dashboard/src/assets/icons/svg/hudi.svg 
b/amoro-ams/amoro-ams-dashboard/src/assets/icons/svg/hudi.svg
new file mode 100644
index 000000000..f2c99fcff
--- /dev/null
+++ b/amoro-ams/amoro-ams-dashboard/src/assets/icons/svg/hudi.svg
@@ -0,0 +1,37 @@
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+/-->
+
+<?xml version="1.0" encoding="UTF-8"?>
+<svg version="1.1" xmlns="http://www.w3.org/2000/svg"; width="500" height="342">
+<path d="M0 0 C3.5625 0.25 3.5625 0.25 6.5625 2.25 C7.6940918 4.15087891 
7.6940918 4.15087891 8.76171875 6.4765625 C9.36234131 7.76494019 9.36234131 
7.76494019 9.97509766 9.0793457 C10.39581543 10.00191162 10.8165332 10.92447754 
11.25 11.875 C12.14588929 13.79875789 13.04303695 15.72193024 13.94140625 
17.64453125 C14.6068042 19.07466553 14.6068042 19.07466553 15.28564453 
20.53369141 C17.02786051 24.23998108 18.84351208 27.90600574 20.6875 31.5625 
C21.1659436 32.51310303 21.1659436 32.513 [...]
+<path d="M0 0 C3.71599191 -0.20086443 4.77208445 -0.1519437 8 2 C9.1315918 
3.90087891 9.1315918 3.90087891 10.19921875 6.2265625 C10.79984131 7.51494019 
10.79984131 7.51494019 11.41259766 8.8293457 C11.83331543 9.75191162 12.2540332 
10.67447754 12.6875 11.625 C13.58338929 13.54875789 14.48053695 15.47193024 
15.37890625 17.39453125 C16.0443042 18.82466553 16.0443042 18.82466553 
16.72314453 20.28369141 C18.46536051 23.98998108 20.28101208 27.65600574 22.125 
31.3125 C22.6034436 32.26310303  [...]
+<path d="M0 0 C0.66 0 1.32 0 2 0 C2.61875 1.4540625 2.61875 1.4540625 3.25 
2.9375 C3.83308924 4.29177179 4.4164372 5.6459322 5 7 C5.26683594 7.6290625 
5.53367188 8.258125 5.80859375 8.90625 C9.20793429 16.7625037 13.14373863 
24.36171716 17 32 C20.03492556 38.02382865 23.0658637 44.04965596 26.09350586 
50.07714844 C27.00065428 51.88122854 27.90921697 53.6845981 28.8190918 
55.48730469 C30.15267049 58.13029703 31.48112603 60.77580923 32.80859375 
63.421875 C33.21581161 64.22598816 33.6230294 [...]
+<path d="M0 0 C1.33804688 0.59941406 1.33804688 0.59941406 2.703125 1.2109375 
C15.69346307 7.11944611 28.69871552 17.23742487 37.6875 28.25 C39.7042322 
31.01742194 39.7042322 31.01742194 42 32 C42 32.99 42 33.98 42 35 C42.66 35 
43.32 35 44 35 C44 35.66 44 36.32 44 37 C44.66 37 45.32 37 46 37 C47.38490744 
38.8436029 48.68388849 40.75222264 49.9375 42.6875 C50.62714844 43.74324219 
51.31679688 44.79898438 52.02734375 45.88671875 C57 53.73465347 57 53.73465347 
57 56 C57.66 56 58.32 56 59 56  [...]
+<path d="M0 0 C7.10730859 0.49165512 12.94352854 2.18206364 19.5625 4.75 
C21.51847161 5.48589557 23.47551174 6.21895711 25.43359375 6.94921875 
C26.38476074 7.30419434 27.33592773 7.65916992 28.31591797 8.02490234 
C31.88594031 9.32185243 35.48874733 10.4676789 39.125 11.5625 C45.20616785 
13.40868019 51.19767785 15.43907956 57.1875 17.5625 C62.59801962 19.47988906 
67.99892038 21.35461932 73.5 23 C79.2958658 24.74482871 85.02601951 26.6592699 
90.75 28.625 C91.64525391 28.92664063 92.5405078 [...]
+<path d="M0 0 C1.98 0.495 1.98 0.495 4 1 C4 1.66 4 2.32 4 3 C4.66 3 5.32 3 6 3 
C6.25394531 3.58007812 6.50789062 4.16015625 6.76953125 4.7578125 C8.74090598 
9.23216863 10.77575 13.64417709 13 18 C13.60526192 19.20778509 14.20932642 
20.41617064 14.8125 21.625 C15.15410156 22.30820313 15.49570313 22.99140625 
15.84765625 23.6953125 C29.16796875 50.3359375 29.16796875 50.3359375 
30.10327148 52.20727539 C30.95809217 53.91621857 31.81493178 55.62415142 
32.671875 57.33203125 C33.49252193 58.980 [...]
+<path d="M0 0 C9.22270478 -0.02313993 18.44539961 -0.04092354 27.66812706 
-0.05181217 C31.95031126 -0.05703738 36.23247564 -0.06412848 40.51464844 
-0.07543945 C44.64487529 -0.08628117 48.77508305 -0.09227957 52.90532303 
-0.09487724 C54.4834282 -0.09672917 56.06153229 -0.10034535 57.63962936 
-0.10573006 C59.84489984 -0.11295752 62.05009074 -0.11399022 64.25537109 
-0.11352539 C65.51223724 -0.115746 66.76910339 -0.11796661 68.0640564 
-0.12025452 C71 0 71 0 73 1 C74.57720066 1.09048137 76.15 [...]
+<path d="M0 0 C1.32 0.33 2.64 0.66 4 1 C4 1.66 4 2.32 4 3 C4.66 3 5.32 3 6 3 
C6.25394531 3.58007812 6.50789062 4.16015625 6.76953125 4.7578125 C8.74090598 
9.23216863 10.77575 13.64417709 13 18 C13.60526192 19.20778509 14.20932642 
20.41617064 14.8125 21.625 C15.15410156 22.30820313 15.49570313 22.99140625 
15.84765625 23.6953125 C29.16796875 50.3359375 29.16796875 50.3359375 
30.10327148 52.20727539 C30.95809217 53.91621857 31.81493178 55.62415142 
32.671875 57.33203125 C33.49252193 58.98056 [...]
+<path d="M0 0 C22.77 0 45.54 0 69 0 C68.67 1.65 68.34 3.3 68 5 C67.12980225 
4.83701416 66.25960449 4.67402832 65.36303711 4.50610352 C59.3427278 3.65251198 
53.25767763 3.87781205 47.19140625 3.90234375 C45.85619663 3.90421167 
44.52098631 3.9056338 43.18577576 3.90663147 C39.69237332 3.91042723 
36.19901327 3.920232 32.70562744 3.93133545 C29.1325335 3.9416132 25.55943263 
3.94614265 21.98632812 3.95117188 C14.99086814 3.96185725 7.99543611 3.97889785 
1 4 C0.67 2.68 0.34 1.36 0 0 Z " fill=" [...]
+<path d="M0 0 C3.71599191 -0.20086443 4.77208445 -0.1519437 8 2 C9.1315918 
3.90087891 9.1315918 3.90087891 10.19921875 6.2265625 C10.79984131 7.51494019 
10.79984131 7.51494019 11.41259766 8.8293457 C11.83331543 9.75191162 12.2540332 
10.67447754 12.6875 11.625 C13.58338929 13.54875789 14.48053695 15.47193024 
15.37890625 17.39453125 C15.82250488 18.3479541 16.26610352 19.30137695 
16.72314453 20.28369141 C18.46536051 23.98998108 20.28101208 27.65600574 22.125 
31.3125 C22.4439624 31.94623535 [...]
+<path d="M0 0 C1.32 0.33 2.64 0.66 4 1 C4 1.66 4 2.32 4 3 C4.66 3 5.32 3 6 3 
C6.25394531 3.58007812 6.50789062 4.16015625 6.76953125 4.7578125 C8.74090598 
9.23216863 10.77575 13.64417709 13 18 C13.60526192 19.20778509 14.20932642 
20.41617064 14.8125 21.625 C15.15410156 22.30820313 15.49570313 22.99140625 
15.84765625 23.6953125 C29.16796875 50.3359375 29.16796875 50.3359375 
30.10327148 52.20727539 C30.95809217 53.91621857 31.81493178 55.62415142 
32.671875 57.33203125 C33.49252193 58.98056 [...]
+<path d="M0 0 C0.99 0 1.98 0 3 0 C3.33 0.66 3.66 1.32 4 2 C25.45 2 46.9 2 69 2 
C69.495 3.98 69.495 3.98 70 6 C69.01 5.505 69.01 5.505 68 5 C66.19793348 
4.94792262 64.3937602 4.96306266 62.59155273 5.01000977 C61.45954941 5.03656143 
60.32754608 5.0631131 59.16123962 5.09046936 C57.31605492 5.14043312 
57.31605492 5.14043312 55.43359375 5.19140625 C52.83789782 5.24967058 
50.24219442 5.30760262 47.64648438 5.36523438 C43.54729814 5.46091206 
39.44827109 5.56169103 35.34936523 5.66870117 C31.3 [...]
+<path d="M0 0 C0.99 0.33 1.98 0.66 3 1 C3 1.66 3 2.32 3 3 C10.71136437 
5.12252678 17.82046441 5.18268236 25.78515625 4.9765625 C27.08198837 4.95153656 
28.3788205 4.92651062 29.71495056 4.90072632 C33.13293631 4.83464565 
36.55050049 4.75563571 39.96813965 4.67376709 C43.46752971 4.59228394 
46.96716416 4.5232563 50.46679688 4.453125 C57.31148468 4.31415141 64.15580744 
4.1614931 71 4 C71 4.66 71 5.32 71 6 C66.47633546 7.15115122 62.0672259 
7.07203586 57.4375 7 C50.62368169 6.94199323 43.864 [...]
+<path d="M0 0 C0.99 0 1.98 0 3 0 C2.67 0.99 2.34 1.98 2 3 C1.34 3 0.68 3 0 3 
C-0.66 4.98 -1.32 6.96 -2 9 C-3 6 -3 6 -1.5625 2.8125 C-1.046875 1.884375 
-0.53125 0.95625 0 0 Z " fill="#003150" transform="translate(259,70)"/>
+<path d="" fill="#004A5D" transform="translate(0,0)"/>
+</svg>
diff --git a/amoro-ams/amoro-ams-dashboard/src/types/common.type.ts 
b/amoro-ams/amoro-ams-dashboard/src/types/common.type.ts
index ec6d062ce..f697712bc 100644
--- a/amoro-ams/amoro-ams-dashboard/src/types/common.type.ts
+++ b/amoro-ams/amoro-ams-dashboard/src/types/common.type.ts
@@ -353,6 +353,7 @@ export enum tableTypeIconMap {
   ARCTIC = 'amoro',
   HIVE = 'hive',
   PAIMON = 'paimon',
+  HUDI = 'hudi',
 }
 
 export type ILineChartOriginalData = Record<string, Record<string, number>>
diff --git a/amoro-ams/amoro-ams-dashboard/src/views/catalogs/Detail.vue 
b/amoro-ams/amoro-ams-dashboard/src/views/catalogs/Detail.vue
index bce5d3174..61bc43066 100644
--- a/amoro-ams/amoro-ams-dashboard/src/views/catalogs/Detail.vue
+++ b/amoro-ams/amoro-ams-dashboard/src/views/catalogs/Detail.vue
@@ -113,6 +113,7 @@ const tableFormatMap = {
   ICEBERG: 'ICEBERG',
   MIXED_ICEBERG: 'MIXED_ICEBERG',
   PAIMON: 'PAIMON',
+  HUDI: 'HUDI',
 }
 
 const tableFormatText = {
@@ -120,10 +121,12 @@ const tableFormatText = {
   [tableFormatMap.MIXED_HIVE]: 'Mixed Hive',
   [tableFormatMap.MIXED_ICEBERG]: 'Mixed Iceberg',
   [tableFormatMap.PAIMON]: 'Paimon',
+  [tableFormatMap.HUDI]: 'Hudi',
 }
 const storeSupportFormat: { [prop: string]: string[] } = {
   ams: [tableFormatMap.MIXED_ICEBERG, tableFormatMap.ICEBERG],
-  hive: [tableFormatMap.MIXED_HIVE, tableFormatMap.MIXED_ICEBERG, 
tableFormatMap.ICEBERG, tableFormatMap.PAIMON],
+  hive: [tableFormatMap.MIXED_HIVE, tableFormatMap.MIXED_ICEBERG,
+      tableFormatMap.ICEBERG, tableFormatMap.PAIMON, tableFormatMap.HUDI],
   hadoop: [tableFormatMap.MIXED_ICEBERG, tableFormatMap.ICEBERG, 
tableFormatMap.PAIMON],
   glue: [tableFormatMap.MIXED_ICEBERG, tableFormatMap.ICEBERG],
   custom: [tableFormatMap.MIXED_ICEBERG, tableFormatMap.ICEBERG],
diff --git a/amoro-ams/amoro-ams-server/pom.xml 
b/amoro-ams/amoro-ams-server/pom.xml
index 016fa4859..0672b8895 100644
--- a/amoro-ams/amoro-ams-server/pom.xml
+++ b/amoro-ams/amoro-ams-server/pom.xml
@@ -194,6 +194,12 @@
             <artifactId>derby</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-dbcp2</artifactId>
@@ -292,6 +298,22 @@
             </exclusions>
         </dependency>
 
+        <!--  apache hudi dependencies -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <!-- runtime dependencies -->
         <dependency>
             <groupId>org.apache.amoro</groupId>
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
index eb08e6aac..9aaa6ca17 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/catalog/CatalogBuilder.java
@@ -43,7 +43,11 @@ public class CatalogBuilder {
   private static final Map<String, Set<TableFormat>> formatSupportedMatrix =
       ImmutableMap.of(
           CATALOG_TYPE_HADOOP,
-              Sets.newHashSet(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, 
TableFormat.PAIMON),
+              Sets.newHashSet(
+                  TableFormat.ICEBERG,
+                  TableFormat.MIXED_ICEBERG,
+                  TableFormat.PAIMON,
+                  TableFormat.HUDI),
           CATALOG_TYPE_GLUE, Sets.newHashSet(TableFormat.ICEBERG, 
TableFormat.MIXED_ICEBERG),
           CATALOG_TYPE_CUSTOM, Sets.newHashSet(TableFormat.ICEBERG, 
TableFormat.MIXED_ICEBERG),
           CATALOG_TYPE_HIVE,
@@ -51,7 +55,8 @@ public class CatalogBuilder {
                   TableFormat.ICEBERG,
                   TableFormat.MIXED_ICEBERG,
                   TableFormat.MIXED_HIVE,
-                  TableFormat.PAIMON),
+                  TableFormat.PAIMON,
+                  TableFormat.HUDI),
           CATALOG_TYPE_AMS, Sets.newHashSet(TableFormat.ICEBERG, 
TableFormat.MIXED_ICEBERG));
 
   private static String getAmsURI(Configurations serviceConfig) {
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
index d844b32b1..72ecf271b 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/FormatTableDescriptor.java
@@ -48,7 +48,7 @@ public interface FormatTableDescriptor {
       AmoroTable<?> amoroTable, String ref, OperationType operationType);
 
   /** Get the snapshot detail information of the {@link AmoroTable}. */
-  List<PartitionFileBaseInfo> getSnapshotDetail(AmoroTable<?> amoroTable, long 
snapshotId);
+  List<PartitionFileBaseInfo> getSnapshotDetail(AmoroTable<?> amoroTable, 
String snapshotId);
 
   /** Get the DDL information of the {@link AmoroTable}. */
   List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable);
@@ -65,7 +65,7 @@ public interface FormatTableDescriptor {
       AmoroTable<?> amoroTable, int limit, int offset);
 
   /** Get the paged optimizing process tasks information of the {@link 
AmoroTable}. */
-  List<OptimizingTaskInfo> getOptimizingTaskInfos(AmoroTable<?> amoroTable, 
long processId);
+  List<OptimizingTaskInfo> getOptimizingTaskInfos(AmoroTable<?> amoroTable, 
String processId);
 
   /** Get the tag information of the {@link AmoroTable}. */
   List<TagOrBranchInfo> getTableTags(AmoroTable<?> amoroTable);
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/HudiTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/HudiTableDescriptor.java
new file mode 100644
index 000000000..50bc36a47
--- /dev/null
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/HudiTableDescriptor.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.data.DataFileType;
+import org.apache.amoro.formats.hudi.HudiTable;
+import org.apache.amoro.server.AmoroServiceConstants;
+import org.apache.amoro.server.dashboard.model.AMSColumnInfo;
+import org.apache.amoro.server.dashboard.model.AMSPartitionField;
+import org.apache.amoro.server.dashboard.model.AmoroSnapshotsOfTable;
+import org.apache.amoro.server.dashboard.model.ConsumerInfo;
+import org.apache.amoro.server.dashboard.model.DDLInfo;
+import org.apache.amoro.server.dashboard.model.FilesStatistics;
+import org.apache.amoro.server.dashboard.model.OperationType;
+import org.apache.amoro.server.dashboard.model.OptimizingProcessInfo;
+import org.apache.amoro.server.dashboard.model.OptimizingTaskInfo;
+import org.apache.amoro.server.dashboard.model.PartitionBaseInfo;
+import org.apache.amoro.server.dashboard.model.PartitionFileBaseInfo;
+import org.apache.amoro.server.dashboard.model.ServerTableMeta;
+import org.apache.amoro.server.dashboard.model.TableSummary;
+import org.apache.amoro.server.dashboard.model.TagOrBranchInfo;
+import org.apache.amoro.server.dashboard.utils.AmsUtil;
+import org.apache.amoro.server.optimizing.OptimizingProcess;
+import org.apache.amoro.server.optimizing.OptimizingType;
+import org.apache.amoro.server.optimizing.TaskRuntime;
+import org.apache.amoro.server.utils.HudiTableUtil;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieSliceInfo;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Table descriptor for hudi. */
+public class HudiTableDescriptor implements FormatTableDescriptor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HudiTableDescriptor.class);
+
+  private final ExecutorService ioExecutors;
+
+  public HudiTableDescriptor(ExecutorService ioExecutor) {
+    this.ioExecutors = ioExecutor;
+  }
+
+  @Override
+  public List<TableFormat> supportFormat() {
+    return Lists.newArrayList(TableFormat.HUDI);
+  }
+
+  @Override
+  public ServerTableMeta getTableDetail(AmoroTable<?> amoroTable) {
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    HoodieTableConfig hoodieTableConfig = metaClient.getTableConfig();
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+    ServerTableMeta meta = new ServerTableMeta();
+    meta.setTableIdentifier(amoroTable.id());
+    meta.setTableType(TableFormat.HUDI.name());
+    List<AMSColumnInfo> columns = Lists.newArrayList();
+    try {
+      Schema scheme = schemaResolver.getTableAvroSchema(false);
+      scheme
+          .getFields()
+          .forEach(
+              field -> {
+                AMSColumnInfo columnInfo = new AMSColumnInfo();
+                columnInfo.setField(field.name());
+                columnInfo.setType(
+                    
HudiTableUtil.convertAvroSchemaToFieldType(field.schema()).toLowerCase());
+                columnInfo.setRequired(true);
+                columnInfo.setComment(field.doc());
+                columns.add(columnInfo);
+              });
+    } catch (Exception e) {
+      throw new IllegalStateException("Error when parse table schema", e);
+    }
+    Map<String, AMSColumnInfo> columnMap =
+        columns.stream().collect(Collectors.toMap(AMSColumnInfo::getField, 
Function.identity()));
+    meta.setSchema(columns);
+    meta.setProperties(amoroTable.properties());
+    meta.setBaseLocation(metaClient.getBasePathV2().toString());
+
+    if (hoodieTableConfig.isTablePartitioned()) {
+      String[] partitionFields = hoodieTableConfig.getPartitionFields().get();
+      List<AMSPartitionField> partitions = new 
ArrayList<>(partitionFields.length);
+
+      for (String f : partitionFields) {
+        if (columnMap.containsKey(f)) {
+          partitions.add(new AMSPartitionField(f, null, null, null, null));
+        }
+      }
+      meta.setPartitionColumnList(partitions);
+    }
+    if (hoodieTableConfig.getRecordKeyFields().map(f -> f.length > 
0).orElse(false)) {
+      String[] recordFields = hoodieTableConfig.getRecordKeyFields().get();
+      List<AMSColumnInfo> primaryKeys = Lists.newArrayList();
+      for (String field : recordFields) {
+        if (columnMap.containsKey(field)) {
+          primaryKeys.add(columnMap.get(field));
+        }
+      }
+      meta.setPkList(primaryKeys);
+    }
+
+    HoodieTableMetadata hoodieTableMetadata = hoodieTable.getMetadata();
+    List<String> partitions;
+    try {
+      partitions = hoodieTableMetadata.getAllPartitionPaths();
+    } catch (IOException e) {
+      throw new RuntimeException("Error when load partitions for table: " + 
amoroTable.id(), e);
+    }
+
+    SyncableFileSystemView fileSystemView = hoodieTable.getHoodieView();
+    Map<String, HudiTableUtil.HoodiePartitionMetric> metrics =
+        HudiTableUtil.statisticPartitionsMetric(partitions, fileSystemView, 
ioExecutors);
+    long baseFileCount = 0;
+    long logFileCount = 0;
+    long totalBaseSizeInByte = 0;
+    long totalLogSizeInByte = 0;
+    for (HudiTableUtil.HoodiePartitionMetric m : metrics.values()) {
+      baseFileCount += m.getBaseFileCount();
+      logFileCount += m.getLogFileCount();
+      totalBaseSizeInByte += m.getTotalBaseFileSizeInBytes();
+      totalLogSizeInByte += m.getTotalLogFileSizeInBytes();
+    }
+    long totalFileCount = baseFileCount + logFileCount;
+    long totalFileSize = totalBaseSizeInByte + totalLogSizeInByte;
+    String averageFileSize =
+        AmsUtil.byteToXB(totalFileCount == 0 ? 0 : totalFileSize / 
totalFileCount);
+
+    String tableType = metaClient.getTableType() == 
HoodieTableType.COPY_ON_WRITE ? "cow" : "mor";
+    String tableFormat = "Hudi(" + tableType + ")";
+    TableSummary tableSummary =
+        new TableSummary(
+            totalFileCount, AmsUtil.byteToXB(totalFileSize), averageFileSize, 
0, tableFormat);
+    meta.setTableSummary(tableSummary);
+
+    Map<String, Object> baseSummary = new HashMap<>();
+    baseSummary.put("totalSize", AmsUtil.byteToXB(totalBaseSizeInByte));
+    baseSummary.put("fileCount", baseFileCount);
+    baseSummary.put(
+        "averageFileSize",
+        AmsUtil.byteToXB(baseFileCount == 0 ? 0 : totalBaseSizeInByte / 
baseFileCount));
+    meta.setBaseMetrics(baseSummary);
+    if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType()) {
+      Map<String, Object> logSummary = new HashMap<>();
+      logSummary.put("totalSize", AmsUtil.byteToXB(totalLogSizeInByte));
+      logSummary.put("fileCount", logFileCount);
+      logSummary.put(
+          "averageFileSize",
+          AmsUtil.byteToXB(logFileCount == 0 ? 0 : totalLogSizeInByte / 
logFileCount));
+      meta.setChangeMetrics(logSummary);
+    }
+    return meta;
+  }
+
+  @Override
+  public List<AmoroSnapshotsOfTable> getSnapshots(
+      AmoroTable<?> amoroTable, String ref, OperationType operationType) {
+    HudiTable hudiTable = (HudiTable) amoroTable;
+
+    return hudiTable.getSnapshotList(ioExecutors).stream()
+        .filter(
+            s ->
+                OperationType.ALL == operationType
+                    || 
operationType.name().equalsIgnoreCase(s.getOperationType()))
+        .map(
+            s -> {
+              AmoroSnapshotsOfTable snapshotsOfTable = new 
AmoroSnapshotsOfTable();
+              snapshotsOfTable.setSnapshotId(s.getSnapshotId());
+              snapshotsOfTable.setCommitTime(s.getCommitTimestamp());
+              snapshotsOfTable.setFileCount(s.getTotalFileCount());
+              snapshotsOfTable.setFileSize((int) s.getTotalFileSize());
+              snapshotsOfTable.setSummary(s.getSummary());
+              snapshotsOfTable.setOperation(s.getOperation());
+              Map<String, String> fileSummary = new HashMap<>();
+              fileSummary.put("delta-files", "0");
+              fileSummary.put("data-files", 
String.valueOf(s.getBaseFileCount()));
+              fileSummary.put("changelogs", 
String.valueOf(s.getLogFileCount()));
+              snapshotsOfTable.setFilesSummaryForChart(fileSummary);
+              return snapshotsOfTable;
+            })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<PartitionFileBaseInfo> getSnapshotDetail(
+      AmoroTable<?> amoroTable, String snapshotId) {
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    SyncableFileSystemView fileSystemView = hoodieTable.getHoodieView();
+    Map<String, Stream<FileSlice>> ptFsMap =
+        fileSystemView.getAllLatestFileSlicesBeforeOrOn(snapshotId);
+    List<PartitionFileBaseInfo> files = Lists.newArrayList();
+    for (String partition : ptFsMap.keySet()) {
+      Stream<FileSlice> fsStream = ptFsMap.get(partition);
+      List<PartitionFileBaseInfo> fileInPartition =
+          fsStream.flatMap(fs -> fileSliceToFileStream(partition, 
fs)).collect(Collectors.toList());
+      files.addAll(fileInPartition);
+    }
+    return files;
+  }
+
+  @Override
+  public List<DDLInfo> getTableOperations(AmoroTable<?> amoroTable) {
+    // hudi doesn't support schema version track.
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public List<PartitionBaseInfo> getTablePartitions(AmoroTable<?> amoroTable) {
+    HudiTable hudiTable = (HudiTable) amoroTable;
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    List<String> partitions = hudiTable.getPartitions();
+    SyncableFileSystemView fileSystemView = hoodieTable.getHoodieView();
+    Map<String, HudiTableUtil.HoodiePartitionMetric> metrics =
+        HudiTableUtil.statisticPartitionsMetric(partitions, fileSystemView, 
ioExecutors);
+    return metrics.entrySet().stream()
+        .map(
+            e -> {
+              PartitionBaseInfo p = new PartitionBaseInfo();
+              p.setPartition(e.getKey());
+              p.setFileCount(e.getValue().getBaseFileCount() + 
e.getValue().getLogFileCount());
+              p.setFileSize(
+                  e.getValue().getTotalBaseFileSizeInBytes()
+                      + e.getValue().getTotalLogFileSizeInBytes());
+              return p;
+            })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<PartitionFileBaseInfo> getTableFiles(
+      AmoroTable<?> amoroTable, String partition, Integer specId) {
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    SyncableFileSystemView fileSystemView = hoodieTable.getHoodieView();
+    Stream<FileSlice> fileSliceStream = 
fileSystemView.getLatestFileSlices(partition);
+    return fileSliceStream
+        .flatMap(fs -> fileSliceToFileStream(partition, fs))
+        .collect(Collectors.toList());
+  }
+
+  private Stream<PartitionFileBaseInfo> fileSliceToFileStream(String 
partition, FileSlice fs) {
+    List<PartitionFileBaseInfo> files = Lists.newArrayList();
+    if (fs.getBaseFile().isPresent()) {
+      HoodieBaseFile baseFile = fs.getBaseFile().get();
+      long commitTime = parseHoodieCommitTime(baseFile.getCommitTime());
+      PartitionFileBaseInfo file =
+          new PartitionFileBaseInfo(
+              baseFile.getCommitTime(),
+              DataFileType.BASE_FILE,
+              commitTime,
+              partition,
+              0,
+              baseFile.getPath(),
+              baseFile.getFileSize());
+      files.add(file);
+    }
+    fs.getLogFiles()
+        .forEach(
+            l -> {
+              // TODO: can't get commit time from log file
+              PartitionFileBaseInfo file =
+                  new PartitionFileBaseInfo(
+                      "",
+                      DataFileType.LOG_FILE,
+                      0L,
+                      partition,
+                      0,
+                      l.getPath().toString(),
+                      l.getFileSize());
+              files.add(file);
+            });
+    return files.stream();
+  }
+
+  @Override
+  public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
+      AmoroTable<?> amoroTable, int limit, int offset) {
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    HoodieDefaultTimeline timeline = new 
HoodieActiveTimeline(hoodieTable.getMetaClient(), false);
+    List<HoodieInstant> instants = timeline.getInstants();
+    Map<String, HoodieInstant> instantMap = Maps.newHashMap();
+    for (HoodieInstant instant : instants) {
+      instantMap.put(instant.getTimestamp() + "_" + instant.getState().name(), 
instant);
+    }
+    Set<String> optimizingActions =
+        Sets.newHashSet(
+            HoodieTimeline.COMPACTION_ACTION,
+            HoodieTimeline.CLEAN_ACTION,
+            HoodieTimeline.REPLACE_COMMIT_ACTION);
+
+    List<String> timestamps =
+        instants.stream()
+            .filter(i -> i.getState() == HoodieInstant.State.REQUESTED)
+            .filter(i -> optimizingActions.contains(i.getAction()))
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toList());
+
+    List<OptimizingProcessInfo> infos =
+        timestamps.stream()
+            .map(
+                t -> {
+                  OptimizingProcessInfo processInfo = null;
+                  try {
+                    processInfo = getOptimizingInfo(t, instantMap, timeline);
+                    if (processInfo == null) {
+                      return null;
+                    }
+                    processInfo.setCatalogName(amoroTable.id().getCatalog());
+                    processInfo.setDbName(amoroTable.id().getDatabase());
+                    processInfo.setTableName(amoroTable.id().getTableName());
+                    return processInfo;
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                })
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
+    return Pair.of(infos, infos.size());
+  }
+
+  protected OptimizingProcessInfo getOptimizingInfo(
+      String instantTimestamp, Map<String, HoodieInstant> instantMap, 
HoodieTimeline timeline)
+      throws IOException {
+    OptimizingProcessInfo processInfo = new OptimizingProcessInfo();
+    processInfo.setProcessId(instantTimestamp);
+    HoodieInstant request =
+        instantMap.get(instantTimestamp + "_" + 
HoodieInstant.State.REQUESTED.name());
+    if (request == null) {
+      return null;
+    }
+    long startTime = parseHoodieCommitTime(request.getStateTransitionTime());
+    processInfo.setStartTime(startTime);
+    Option<byte[]> detail = timeline.getInstantDetails(request);
+    if (!detail.isPresent()) {
+      return null;
+    }
+    processInfo.setSummary(Maps.newHashMap());
+    if (HoodieTimeline.COMPACTION_ACTION.equals(request.getAction())) {
+      fillCompactProcessInfo(processInfo, detail.get());
+    } else if 
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(request.getAction())) {
+      processInfo = fillClusterProcessInfo(processInfo, detail.get());
+    } else {
+      return null;
+    }
+    if (processInfo == null) {
+      // replace commit is a insert_overwrite
+      return null;
+    }
+
+    HoodieInstant commit =
+        instantMap.get(instantTimestamp + "_" + 
HoodieInstant.State.COMPLETED.name());
+
+    if (commit != null) {
+      processInfo.setSuccessTasks(processInfo.getTotalTasks());
+      long commitTimestamp = 
parseHoodieCommitTime(commit.getStateTransitionTime());
+      processInfo.setDuration(commitTimestamp - startTime);
+      processInfo.setFinishTime(commitTimestamp);
+      processInfo.setStatus(OptimizingProcess.Status.SUCCESS);
+      Option<byte[]> commitDetail = timeline.getInstantDetails(commit);
+      HoodieCommitMetadata commitMetadata =
+          HoodieCommitMetadata.fromBytes(commitDetail.get(), 
HoodieCommitMetadata.class);
+      Map<String, List<HoodieWriteStat>> commitInfo = 
commitMetadata.getPartitionToWriteStats();
+      int outputFile = 0;
+      long outputFileSize = 0;
+      for (String partition : commitInfo.keySet()) {
+        List<HoodieWriteStat> writeStats = commitInfo.get(partition);
+        for (HoodieWriteStat stat : writeStats) {
+          outputFile += 1;
+          outputFileSize += stat.getFileSizeInBytes();
+        }
+      }
+      processInfo.setOutputFiles(FilesStatistics.build(outputFile, 
outputFileSize));
+    } else {
+      HoodieInstant inf =
+          instantMap.get(instantTimestamp + "_" + 
HoodieInstant.State.INFLIGHT.name());
+      if (inf != null) {
+        processInfo.setStatus(OptimizingProcess.Status.RUNNING);
+      }
+    }
+    return processInfo;
+  }
+
+  private void fillCompactProcessInfo(OptimizingProcessInfo processInfo, 
byte[] requestDetails)
+      throws IOException {
+    HoodieCompactionPlan compactionPlan =
+        TimelineMetadataUtils.deserializeCompactionPlan(requestDetails);
+    int inputFileCount = 0;
+    long inputFileSize = 0;
+    for (HoodieCompactionOperation operation : compactionPlan.getOperations()) 
{
+      if (StringUtils.nonEmpty(operation.getDataFilePath())) {
+        inputFileCount += 1;
+      }
+      inputFileCount += operation.getDeltaFilePaths().size();
+      inputFileSize += 
operation.getMetrics().getOrDefault("TOTAL_LOG_FILES_SIZE", 0.0);
+    }
+    processInfo.setInputFiles(FilesStatistics.build(inputFileCount, 
inputFileSize));
+    int tasks = compactionPlan.getOperations().size();
+    processInfo.setTotalTasks(tasks);
+    HoodieCompactionStrategy strategy = compactionPlan.getStrategy();
+    if (strategy != null) {
+      processInfo.getSummary().put("strategy", 
strategy.getCompactorClassName());
+      processInfo.getSummary().putAll(strategy.getStrategyParams());
+    }
+    processInfo.setOptimizingType(OptimizingType.MINOR);
+  }
+
+  private OptimizingProcessInfo fillClusterProcessInfo(
+      OptimizingProcessInfo processInfo, byte[] requestDetails) throws 
IOException {
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata =
+        
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestDetails);
+    String operationType = requestedReplaceMetadata.getOperationType();
+    if (!WriteOperationType.CLUSTER.name().equalsIgnoreCase(operationType)) {
+      return null;
+    }
+    HoodieClusteringPlan plan = requestedReplaceMetadata.getClusteringPlan();
+    int inputFileCount = 0;
+    long inputFileSize = 0;
+    for (HoodieClusteringGroup group : plan.getInputGroups()) {
+      for (HoodieSliceInfo slice : group.getSlices()) {
+        inputFileCount++;
+        if (slice.getDeltaFilePaths() != null) {
+          inputFileCount += slice.getDeltaFilePaths().size();
+        }
+      }
+      inputFileSize += group.getMetrics().getOrDefault("TOTAL_LOG_FILES_SIZE", 
0.0);
+    }
+    processInfo.setInputFiles(FilesStatistics.build(inputFileCount, 
inputFileSize));
+    int tasks = plan.getInputGroups().size();
+    processInfo.setTotalTasks(tasks);
+    processInfo.setOptimizingType(OptimizingType.MAJOR);
+
+    HoodieClusteringStrategy strategy = plan.getStrategy();
+    if (strategy != null) {
+      processInfo.getSummary().put("strategy", 
strategy.getStrategyClassName());
+      processInfo.getSummary().putAll(strategy.getStrategyParams());
+    }
+    return processInfo;
+  }
+
+  @Override
+  public List<OptimizingTaskInfo> getOptimizingTaskInfos(
+      AmoroTable<?> amoroTable, String processId) {
+    HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
+    HoodieDefaultTimeline timeline = new 
HoodieActiveTimeline(hoodieTable.getMetaClient(), false);
+    List<HoodieInstant> instants = timeline.getInstants();
+    HoodieInstant request = null;
+    HoodieInstant complete = null;
+    for (HoodieInstant instant : instants) {
+      if (processId.equals(instant.getTimestamp())) {
+        if (instant.getState() == HoodieInstant.State.REQUESTED) {
+          request = instant;
+        } else if (instant.getState() == HoodieInstant.State.COMPLETED) {
+          complete = instant;
+        }
+      }
+    }
+    if (request == null) {
+      return Lists.newArrayList();
+    }
+    long startTime = parseHoodieCommitTime(request.getStateTransitionTime());
+    long endTime = AmoroServiceConstants.INVALID_TIME;
+    long costTime = AmoroServiceConstants.INVALID_TIME;
+    TaskRuntime.Status status = TaskRuntime.Status.ACKED;
+    Option<byte[]> requestDetails = timeline.getInstantDetails(request);
+    if (!requestDetails.isPresent()) {
+      return Lists.newArrayList();
+    }
+    byte[] commitDetails = null;
+    if (complete != null) {
+      status = TaskRuntime.Status.SUCCESS;
+      endTime = parseHoodieCommitTime(complete.getStateTransitionTime());
+      costTime = endTime - startTime;
+      Option<byte[]> commitDetail = timeline.getInstantDetails(complete);
+      if (commitDetail.isPresent()) {
+        commitDetails = commitDetail.get();
+      }
+    }
+
+    List<OptimizingTaskInfo> tasks = Lists.newArrayList();
+
+    try {
+      if 
(HoodieTimeline.COMPACTION_ACTION.equalsIgnoreCase(request.getAction())) {
+        tasks = getCompactTasks(requestDetails.get(), commitDetails);
+      } else if 
(HoodieTimeline.REPLACE_COMMIT_ACTION.equalsIgnoreCase(request.getAction())) {
+        tasks = getClusterTasks(requestDetails.get(), commitDetails);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("", e);
+    }
+    for (OptimizingTaskInfo task : tasks) {
+      task.setStartTime(startTime);
+      task.setEndTime(endTime);
+      task.setStatus(status);
+      task.setCostTime(costTime);
+    }
+    return tasks;
+  }
+
+  private List<OptimizingTaskInfo> getCompactTasks(byte[] requestDetails, 
byte[] commitDetails)
+      throws IOException {
+    Map<String, Pair<String, FilesStatistics>> inputFileStatistic =
+        getCompactInputTaskStatistics(requestDetails);
+    Map<String, FilesStatistics> outputFileStatistic = Maps.newHashMap();
+    if (commitDetails != null) {
+      HoodieCommitMetadata commitMetadata =
+          HoodieCommitMetadata.fromBytes(commitDetails, 
HoodieCommitMetadata.class);
+      Map<String, List<HoodieWriteStat>> commitInfo = 
commitMetadata.getPartitionToWriteStats();
+      for (String partition : commitInfo.keySet()) {
+        List<HoodieWriteStat> writeStats = commitInfo.get(partition);
+        for (HoodieWriteStat stat : writeStats) {
+          FilesStatistics fs = new FilesStatistics(1, 
stat.getFileSizeInBytes());
+          outputFileStatistic.put(stat.getFileId(), fs);
+        }
+      }
+    }
+    List<OptimizingTaskInfo> results = Lists.newArrayList();
+    int taskId = 0;
+    for (String fileGroupId : inputFileStatistic.keySet()) {
+      FilesStatistics input = inputFileStatistic.get(fileGroupId).second();
+      String partition = inputFileStatistic.get(fileGroupId).first();
+      OptimizingTaskInfo task =
+          new OptimizingTaskInfo(
+              -1L,
+              null,
+              taskId++,
+              partition,
+              null,
+              0,
+              "",
+              0,
+              0,
+              0,
+              0,
+              "",
+              input,
+              outputFileStatistic.get(fileGroupId),
+              Maps.newHashMap(),
+              Maps.newHashMap());
+      results.add(task);
+    }
+    return results;
+  }
+
+  private Map<String, Pair<String, FilesStatistics>> 
getCompactInputTaskStatistics(
+      byte[] requestDetails) throws IOException {
+    HoodieCompactionPlan compactionPlan =
+        TimelineMetadataUtils.deserializeCompactionPlan(requestDetails);
+    Map<String, Pair<String, FilesStatistics>> inputFileGroups = 
Maps.newHashMap();
+    for (HoodieCompactionOperation operation : compactionPlan.getOperations()) 
{
+      int inputFileCount = operation.getDeltaFilePaths().size();
+      long inputFileSize =
+          operation.getMetrics().getOrDefault("TOTAL_LOG_FILES_SIZE", 
0.0).longValue();
+      FilesStatistics statistics = FilesStatistics.build(inputFileCount, 
inputFileSize);
+      inputFileGroups.put(operation.getFileId(), 
Pair.of(operation.getPartitionPath(), statistics));
+    }
+    return inputFileGroups;
+  }
+
+  private List<OptimizingTaskInfo> getClusterTasks(byte[] requestDetails, 
byte[] commitDetails)
+      throws IOException {
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata =
+        
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestDetails);
+    String operationType = requestedReplaceMetadata.getOperationType();
+    if (!WriteOperationType.CLUSTER.name().equalsIgnoreCase(operationType)) {
+      return Lists.newArrayList();
+    }
+    Map<String, FilesStatistics> outputStatisticMap = Maps.newHashMap();
+    if (commitDetails != null) {
+      HoodieCommitMetadata commitMetadata =
+          HoodieCommitMetadata.fromBytes(commitDetails, 
HoodieCommitMetadata.class);
+      Map<String, List<HoodieWriteStat>> commitInfo = 
commitMetadata.getPartitionToWriteStats();
+      for (String partition : commitInfo.keySet()) {
+        List<HoodieWriteStat> writeStats = commitInfo.get(partition);
+        int fileCount = 0;
+        long fileSize = 0;
+        for (HoodieWriteStat stat : writeStats) {
+          fileCount += 1;
+          fileSize += stat.getFileSizeInBytes();
+        }
+        outputStatisticMap.put(partition, FilesStatistics.build(fileCount, 
fileSize));
+      }
+    }
+
+    List<OptimizingTaskInfo> taskInfoList = Lists.newArrayList();
+    HoodieClusteringPlan plan = requestedReplaceMetadata.getClusteringPlan();
+    int taskId = 0;
+    for (HoodieClusteringGroup group : plan.getInputGroups()) {
+      FilesStatistics inputStatistic = getClusterGroupStatistic(group);
+      String partition = group.getSlices().get(0).getPartitionPath();
+      FilesStatistics outputStatistic = outputStatisticMap.get(partition);
+      OptimizingTaskInfo task =
+          new OptimizingTaskInfo(
+              -1L,
+              null,
+              taskId++,
+              partition,
+              null,
+              0,
+              "",
+              0,
+              0,
+              0,
+              0,
+              "",
+              inputStatistic,
+              outputStatistic,
+              Maps.newHashMap(),
+              Maps.newHashMap());
+      taskInfoList.add(task);
+    }
+
+    return taskInfoList;
+  }
+
+  private FilesStatistics getClusterGroupStatistic(HoodieClusteringGroup 
group) {
+    int inputFileCount = 0;
+    long inputFileSize = 0;
+    for (HoodieSliceInfo slice : group.getSlices()) {
+      inputFileCount++;
+      if (slice.getDeltaFilePaths() != null) {
+        inputFileCount += slice.getDeltaFilePaths().size();
+      }
+    }
+    inputFileSize += group.getMetrics().getOrDefault("TOTAL_LOG_FILES_SIZE", 
0.0);
+    return FilesStatistics.build(inputFileCount, inputFileSize);
+  }
+
+  @Override
+  public List<TagOrBranchInfo> getTableTags(AmoroTable<?> amoroTable) {
+    // hudi doesn't support tags and branch
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<TagOrBranchInfo> getTableBranches(AmoroTable<?> amoroTable) {
+    return Lists.newArrayList(
+        new TagOrBranchInfo("hoodie-timeline", -1, -1, 0L, 0L, 
TagOrBranchInfo.BRANCH));
+  }
+
+  @Override
+  public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
+    return Collections.emptyList();
+  }
+
+  private long parseHoodieCommitTime(String commitTime) {
+    try {
+      Date date = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(commitTime);
+      return date.getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException("Error when parse timestamp:" + commitTime, 
e);
+    }
+  }
+}
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 21ca369c9..1db9ad37b 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -39,6 +39,7 @@ import 
org.apache.amoro.server.dashboard.model.PartitionFileBaseInfo;
 import org.apache.amoro.server.dashboard.model.ServerTableMeta;
 import org.apache.amoro.server.dashboard.model.TableBasicInfo;
 import org.apache.amoro.server.dashboard.model.TableStatistics;
+import org.apache.amoro.server.dashboard.model.TableSummary;
 import org.apache.amoro.server.dashboard.model.TagOrBranchInfo;
 import org.apache.amoro.server.dashboard.utils.AmsUtil;
 import org.apache.amoro.server.dashboard.utils.TableStatCollector;
@@ -151,14 +152,11 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
       }
       serverTableMeta.setChangeMetrics(changeMetrics);
     }
-    Map<String, Object> tableSummary = new HashMap<>();
-    tableSummary.put("size", AmsUtil.byteToXB(tableSize));
-    tableSummary.put("file", tableFileCnt);
-    tableSummary.put(
-        "averageFile", AmsUtil.byteToXB(tableFileCnt == 0 ? 0 : tableSize / 
tableFileCnt));
-
-    tableSummary.put("records", getRecordsOfTable(table));
-    tableSummary.put("tableFormat", tableFormat);
+    String averageFileSize = AmsUtil.byteToXB(tableFileCnt == 0 ? 0 : 
tableSize / tableFileCnt);
+    long records = getRecordsOfTable(table);
+    TableSummary tableSummary =
+        new TableSummary(
+            tableFileCnt, AmsUtil.byteToXB(tableSize), averageFileSize, 
records, tableFormat);
     serverTableMeta.setTableSummary(tableSummary);
     return serverTableMeta;
   }
@@ -328,7 +326,8 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
   }
 
   @Override
-  public List<PartitionFileBaseInfo> getSnapshotDetail(AmoroTable<?> 
amoroTable, long snapshotId) {
+  public List<PartitionFileBaseInfo> getSnapshotDetail(
+      AmoroTable<?> amoroTable, String snapshotId) {
     MixedTable mixedTable = getTable(amoroTable);
     List<PartitionFileBaseInfo> result = new ArrayList<>();
     Snapshot snapshot;
@@ -345,14 +344,13 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
           "unknown snapshot " + snapshotId + " of " + amoroTable.id());
     }
     final long snapshotTime = snapshot.timestampMillis();
-    String commitId = String.valueOf(snapshotId);
     snapshot
         .addedDataFiles(mixedTable.io())
         .forEach(
             f ->
                 result.add(
                     new PartitionFileBaseInfo(
-                        commitId,
+                        snapshotId,
                         DataFileType.ofContentId(f.content().id()),
                         snapshotTime,
                         
MixedTableUtil.getMixedTablePartitionSpecById(mixedTable, f.specId())
@@ -366,7 +364,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
             f ->
                 result.add(
                     new PartitionFileBaseInfo(
-                        commitId,
+                        snapshotId,
                         DataFileType.ofContentId(f.content().id()),
                         snapshotTime,
                         
MixedTableUtil.getMixedTablePartitionSpecById(mixedTable, f.specId())
@@ -380,7 +378,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
             f ->
                 result.add(
                     new PartitionFileBaseInfo(
-                        commitId,
+                        snapshotId,
                         DataFileType.ofContentId(f.content().id()),
                         snapshotTime,
                         
MixedTableUtil.getMixedTablePartitionSpecById(mixedTable, f.specId())
@@ -394,7 +392,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
             f ->
                 result.add(
                     new PartitionFileBaseInfo(
-                        commitId,
+                        snapshotId,
                         DataFileType.ofContentId(f.content().id()),
                         snapshotTime,
                         
MixedTableUtil.getMixedTablePartitionSpecById(mixedTable, f.specId())
@@ -524,11 +522,13 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
   }
 
   @Override
-  public List<OptimizingTaskInfo> getOptimizingTaskInfos(AmoroTable<?> 
amoroTable, long processId) {
+  public List<OptimizingTaskInfo> getOptimizingTaskInfos(
+      AmoroTable<?> amoroTable, String processId) {
+    long id = Long.parseLong(processId);
     List<OptimizingTaskMeta> optimizingTaskMetaList =
         getAs(
             OptimizingMapper.class,
-            mapper -> 
mapper.selectOptimizeTaskMetas(Collections.singletonList(processId)));
+            mapper -> 
mapper.selectOptimizeTaskMetas(Collections.singletonList(id)));
     if (CollectionUtils.isEmpty(optimizingTaskMetaList)) {
       return Collections.emptyList();
     }
@@ -537,7 +537,7 @@ public class MixedAndIcebergTableDescriptor extends 
PersistentBase
             taskMeta ->
                 new OptimizingTaskInfo(
                     taskMeta.getTableId(),
-                    taskMeta.getProcessId(),
+                    String.valueOf(taskMeta.getProcessId()),
                     taskMeta.getTaskId(),
                     taskMeta.getPartitionData(),
                     taskMeta.getStatus(),
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
index 44b9364e7..5b0d225b0 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/PaimonTableDescriptor.java
@@ -38,6 +38,7 @@ import 
org.apache.amoro.server.dashboard.model.OptimizingTaskInfo;
 import org.apache.amoro.server.dashboard.model.PartitionBaseInfo;
 import org.apache.amoro.server.dashboard.model.PartitionFileBaseInfo;
 import org.apache.amoro.server.dashboard.model.ServerTableMeta;
+import org.apache.amoro.server.dashboard.model.TableSummary;
 import org.apache.amoro.server.dashboard.model.TagOrBranchInfo;
 import org.apache.amoro.server.dashboard.utils.AmsUtil;
 import org.apache.amoro.server.dashboard.utils.FilesStatisticsBuilder;
@@ -136,10 +137,9 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
     // properties
     serverTableMeta.setProperties(table.options());
 
-    Map<String, Object> tableSummary = new HashMap<>();
     Map<String, Object> baseMetric = new HashMap<>();
     // table summary
-    tableSummary.put("tableFormat", 
AmsUtil.formatString(amoroTable.format().name()));
+    TableSummary tableSummary;
     Snapshot snapshot = store.snapshotManager().latestSnapshot();
     if (snapshot != null) {
       AmoroSnapshotsOfTable snapshotsOfTable =
@@ -150,10 +150,9 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
 
       String averageFileSize = AmsUtil.byteToXB(fileCount == 0 ? 0 : fileSize 
/ fileCount);
 
-      tableSummary.put("averageFile", averageFileSize);
-      tableSummary.put("file", fileCount);
-      tableSummary.put("size", totalSize);
-      tableSummary.put("records", snapshotsOfTable.getRecords());
+      tableSummary =
+          new TableSummary(
+              fileCount, totalSize, averageFileSize, 
snapshotsOfTable.getRecords(), "paimon");
 
       baseMetric.put("totalSize", totalSize);
       baseMetric.put("fileCount", fileCount);
@@ -164,9 +163,7 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
         baseMetric.put("baseWatermark", watermark);
       }
     } else {
-      tableSummary.put("size", 0);
-      tableSummary.put("file", 0);
-      tableSummary.put("averageFile", 0);
+      tableSummary = new TableSummary(0, "0", "0", 0, "paimon");
 
       baseMetric.put("totalSize", 0);
       baseMetric.put("fileCount", 0);
@@ -225,10 +222,12 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
   }
 
   @Override
-  public List<PartitionFileBaseInfo> getSnapshotDetail(AmoroTable<?> 
amoroTable, long snapshotId) {
+  public List<PartitionFileBaseInfo> getSnapshotDetail(
+      AmoroTable<?> amoroTable, String snapshotId) {
     FileStoreTable table = getTable(amoroTable);
     List<PartitionFileBaseInfo> amsDataFileInfos = new ArrayList<>();
-    Snapshot snapshot = table.snapshotManager().snapshot(snapshotId);
+    long commitId = Long.parseLong(snapshotId);
+    Snapshot snapshot = table.snapshotManager().snapshot(commitId);
     FileStore<?> store = table.store();
     FileStorePathFactory fileStorePathFactory = store.pathFactory();
     ManifestList manifestList = store.manifestListFactory().create();
@@ -385,7 +384,7 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
               .map(
                   s -> {
                     OptimizingProcessInfo optimizingProcessInfo = new 
OptimizingProcessInfo();
-                    optimizingProcessInfo.setProcessId(s.id());
+                    optimizingProcessInfo.setProcessId(String.valueOf(s.id()));
                     
optimizingProcessInfo.setCatalogName(tableIdentifier.getCatalog());
                     
optimizingProcessInfo.setDbName(tableIdentifier.getDatabase());
                     
optimizingProcessInfo.setTableName(tableIdentifier.getTableName());
@@ -445,7 +444,8 @@ public class PaimonTableDescriptor implements 
FormatTableDescriptor {
   }
 
   @Override
-  public List<OptimizingTaskInfo> getOptimizingTaskInfos(AmoroTable<?> 
amoroTable, long processId) {
+  public List<OptimizingTaskInfo> getOptimizingTaskInfos(
+      AmoroTable<?> amoroTable, String processId) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
index 0df27a730..21f77331d 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/ServerTableDescriptor.java
@@ -58,7 +58,8 @@ public class ServerTableDescriptor extends PersistentBase {
     FormatTableDescriptor[] formatTableDescriptors =
         new FormatTableDescriptor[] {
           new MixedAndIcebergTableDescriptor(executorService),
-          new PaimonTableDescriptor(executorService)
+          new PaimonTableDescriptor(executorService),
+          new HudiTableDescriptor(executorService)
         };
     for (FormatTableDescriptor formatTableDescriptor : formatTableDescriptors) 
{
       for (TableFormat format : formatTableDescriptor.supportFormat()) {
@@ -81,7 +82,7 @@ public class ServerTableDescriptor extends PersistentBase {
   }
 
   public List<PartitionFileBaseInfo> getSnapshotDetail(
-      TableIdentifier tableIdentifier, long snapshotId) {
+      TableIdentifier tableIdentifier, String snapshotId) {
     AmoroTable<?> amoroTable = loadTable(tableIdentifier);
     FormatTableDescriptor formatTableDescriptor = 
formatDescriptorMap.get(amoroTable.format());
     return formatTableDescriptor.getSnapshotDetail(amoroTable, snapshotId);
@@ -132,7 +133,7 @@ public class ServerTableDescriptor extends PersistentBase {
   }
 
   public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
-      TableIdentifier tableIdentifier, long processId) {
+      TableIdentifier tableIdentifier, String processId) {
     AmoroTable<?> amoroTable = loadTable(tableIdentifier);
     FormatTableDescriptor formatTableDescriptor = 
formatDescriptorMap.get(amoroTable.format());
     return formatTableDescriptor.getOptimizingTaskInfos(amoroTable, processId);
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
index da0d33be8..a7a78c9b1 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/CatalogController.java
@@ -18,6 +18,7 @@
 
 package org.apache.amoro.server.dashboard.controller;
 
+import static org.apache.amoro.TableFormat.HUDI;
 import static org.apache.amoro.TableFormat.ICEBERG;
 import static org.apache.amoro.TableFormat.MIXED_HIVE;
 import static org.apache.amoro.TableFormat.MIXED_ICEBERG;
@@ -125,6 +126,8 @@ public class CatalogController {
         CatalogDescriptor.of(CATALOG_TYPE_HIVE, 
STORAGE_CONFIGS_VALUE_TYPE_HADOOP, MIXED_HIVE));
     VALIDATE_CATALOGS.add(
         CatalogDescriptor.of(CATALOG_TYPE_HIVE, 
STORAGE_CONFIGS_VALUE_TYPE_HADOOP, PAIMON));
+    VALIDATE_CATALOGS.add(
+        CatalogDescriptor.of(CATALOG_TYPE_HIVE, 
STORAGE_CONFIGS_VALUE_TYPE_HADOOP, HUDI));
     VALIDATE_CATALOGS.add(
         CatalogDescriptor.of(
             CATALOG_TYPE_HADOOP, STORAGE_CONFIGS_VALUE_TYPE_HADOOP, 
MIXED_ICEBERG));
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index 8d6884f47..178c0001c 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -50,6 +50,7 @@ import 
org.apache.amoro.server.dashboard.model.PartitionFileBaseInfo;
 import org.apache.amoro.server.dashboard.model.ServerTableMeta;
 import org.apache.amoro.server.dashboard.model.TableMeta;
 import org.apache.amoro.server.dashboard.model.TableOperation;
+import org.apache.amoro.server.dashboard.model.TableSummary;
 import org.apache.amoro.server.dashboard.model.TagOrBranchInfo;
 import org.apache.amoro.server.dashboard.model.UpgradeHiveMeta;
 import org.apache.amoro.server.dashboard.model.UpgradeRunningInfo;
@@ -140,17 +141,16 @@ public class TableController {
     ServerTableMeta serverTableMeta =
         tableDescriptor.getTableDetail(
             TableIdentifier.of(catalog, database, 
tableName).buildTableIdentifier());
-    Map<String, Object> tableSummary = serverTableMeta.getTableSummary();
+    TableSummary tableSummary = serverTableMeta.getTableSummary();
     Optional<ServerTableIdentifier> serverTableIdentifier =
         Optional.ofNullable(
             tableService.getServerTableIdentifier(
                 TableIdentifier.of(catalog, database, 
tableName).buildTableIdentifier()));
     if (serverTableIdentifier.isPresent()) {
-      tableSummary.put(
-          "optimizingStatus",
-          
tableService.getRuntime(serverTableIdentifier.get()).getOptimizingStatus());
+      TableRuntime tableRuntime = 
tableService.getRuntime(serverTableIdentifier.get());
+      
tableSummary.setOptimizingStatus(tableRuntime.getOptimizingStatus().name());
     } else {
-      tableSummary.put("optimizingStatus", OptimizingStatus.IDLE);
+      tableSummary.setOptimizingStatus(OptimizingStatus.IDLE.name());
     }
     ctx.json(OkResponse.of(serverTableMeta));
   }
@@ -346,7 +346,7 @@ public class TableController {
     TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
     List<OptimizingTaskInfo> optimizingTaskInfos =
         tableDescriptor.getOptimizingProcessTaskInfos(
-            tableIdentifier.buildTableIdentifier(), Long.parseLong(processId));
+            tableIdentifier.buildTableIdentifier(), processId);
 
     PageResult<OptimizingTaskInfo> pageResult = 
PageResult.of(optimizingTaskInfos, offset, limit);
     ctx.json(OkResponse.of(pageResult));
@@ -396,8 +396,7 @@ public class TableController {
 
     List<PartitionFileBaseInfo> result =
         tableDescriptor.getSnapshotDetail(
-            TableIdentifier.of(catalog, database, 
tableName).buildTableIdentifier(),
-            Long.parseLong(snapshotId));
+            TableIdentifier.of(catalog, database, 
tableName).buildTableIdentifier(), snapshotId);
     int offset = (page - 1) * pageSize;
     PageResult<PartitionFileBaseInfo> amsPageResult = PageResult.of(result, 
offset, pageSize);
     ctx.json(OkResponse.of(amsPageResult));
@@ -501,6 +500,8 @@ public class TableController {
               return TableMeta.TableType.PAIMON.toString();
             case ICEBERG:
               return TableMeta.TableType.ICEBERG.toString();
+            case HUDI:
+              return TableMeta.TableType.HUDI.toString();
             default:
               throw new IllegalStateException("Unknown format");
           }
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingProcessInfo.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingProcessInfo.java
index b8b789b2c..f7f258790 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingProcessInfo.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingProcessInfo.java
@@ -33,7 +33,7 @@ public class OptimizingProcessInfo {
   private String dbName;
   private String tableName;
 
-  private Long processId;
+  private String processId;
   private long startTime;
   private OptimizingType optimizingType;
   private OptimizingProcess.Status status;
@@ -79,11 +79,11 @@ public class OptimizingProcessInfo {
     this.tableName = tableName;
   }
 
-  public Long getProcessId() {
+  public String getProcessId() {
     return processId;
   }
 
-  public void setProcessId(Long processId) {
+  public void setProcessId(String processId) {
     this.processId = processId;
   }
 
@@ -219,7 +219,7 @@ public class OptimizingProcessInfo {
     result.setDbName(meta.getDbName());
     result.setTableName(meta.getTableName());
 
-    result.setProcessId(meta.getProcessId());
+    result.setProcessId(String.valueOf(meta.getProcessId()));
     result.setStartTime(meta.getPlanTime());
     result.setOptimizingType(meta.getOptimizingType());
     result.setStatus(meta.getStatus());
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingTaskInfo.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingTaskInfo.java
index 00f27c7a7..c4acb0b0c 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingTaskInfo.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/OptimizingTaskInfo.java
@@ -28,7 +28,7 @@ public class OptimizingTaskInfo {
   public static String OPTIMIZER_TOKEN_PROP = "optimizer.token";
   public static String OPTIMIZER_THREAD_ID_PROP = "optimizer.thread-id";
   private Long tableId;
-  private Long processId;
+  private String processId;
   private int taskId;
   private String partitionData;
   private TaskRuntime.Status status;
@@ -46,7 +46,7 @@ public class OptimizingTaskInfo {
 
   public OptimizingTaskInfo(
       Long tableId,
-      Long processId,
+      String processId,
       int taskId,
       String partitionData,
       TaskRuntime.Status status,
@@ -98,11 +98,11 @@ public class OptimizingTaskInfo {
     this.tableId = tableId;
   }
 
-  public Long getProcessId() {
+  public String getProcessId() {
     return processId;
   }
 
-  public void setProcessId(Long processId) {
+  public void setProcessId(String processId) {
     this.processId = processId;
   }
 
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ServerTableMeta.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ServerTableMeta.java
index ee137982b..9394c6535 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ServerTableMeta.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/ServerTableMeta.java
@@ -35,7 +35,7 @@ public class ServerTableMeta {
   private Map<String, String> properties;
   private Map<String, Object> changeMetrics;
   private Map<String, Object> baseMetrics;
-  private Map<String, Object> tableSummary;
+  private TableSummary tableSummary;
   private String baseLocation;
   private String filter;
   private long createTime;
@@ -158,11 +158,11 @@ public class ServerTableMeta {
     this.baseMetrics = baseMetrics;
   }
 
-  public Map<String, Object> getTableSummary() {
+  public TableSummary getTableSummary() {
     return tableSummary;
   }
 
-  public void setTableSummary(Map<String, Object> tableSummary) {
+  public void setTableSummary(TableSummary tableSummary) {
     this.tableSummary = tableSummary;
   }
 
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
index b70fe01bd..dead0be96 100644
--- 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableMeta.java
@@ -66,7 +66,9 @@ public class TableMeta {
     ARCTIC("arctic"),
     HIVE("hive"),
     ICEBERG("iceberg"),
-    PAIMON("paimon");
+    PAIMON("paimon"),
+
+    HUDI("hudi");
 
     private final String name;
 
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableSummary.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableSummary.java
new file mode 100644
index 000000000..1ffdf853e
--- /dev/null
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/dashboard/model/TableSummary.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.dashboard.model;
+
+/** Table summary for page of details */
+public class TableSummary {
+  private long file;
+  private String size;
+  private String averageFile;
+  private String tableFormat;
+  private long records;
+  private String optimizingStatus;
+
+  public TableSummary() {}
+
+  public TableSummary(
+      long file, String size, String averageFile, long records, String 
tableFormat) {
+    this.file = file;
+    this.size = size;
+    this.averageFile = averageFile;
+    this.records = records;
+    this.tableFormat = tableFormat;
+  }
+
+  /** Total size of table in human readable. */
+  public String getSize() {
+    return size;
+  }
+
+  /** Average file size in human readable */
+  public String getAverageFile() {
+    return averageFile;
+  }
+
+  /** Table type of specified table. */
+  public String getTableFormat() {
+    return tableFormat;
+  }
+
+  /** Total file nums. */
+  public long getFile() {
+    return file;
+  }
+
+  /** Total records of table. */
+  public long getRecords() {
+    return records;
+  }
+
+  /** Current table optimizing status */
+  public String getOptimizingStatus() {
+    return optimizingStatus;
+  }
+
+  public void setOptimizingStatus(String optimizingStatus) {
+    this.optimizingStatus = optimizingStatus;
+  }
+}
diff --git 
a/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/HudiTableUtil.java
 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/HudiTableUtil.java
new file mode 100644
index 000000000..6a3c93f7d
--- /dev/null
+++ 
b/amoro-ams/amoro-ams-server/src/main/java/org/apache/amoro/server/utils/HudiTableUtil.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.utils;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Common utils for hudi table. */
+public class HudiTableUtil {
+
+  public static String convertAvroSchemaToFieldType(Schema schema) {
+    switch (schema.getType()) {
+      case ENUM:
+      case STRING:
+        // convert Avro's Utf8/CharSequence to String
+        return "STRING";
+      case ARRAY:
+        return "ARRAY<" + 
convertAvroSchemaToFieldType(schema.getElementType()) + ">";
+      case MAP:
+        return "MAP<STRING, " + 
convertAvroSchemaToFieldType(schema.getValueType()) + ">";
+      case UNION:
+        final Schema actualSchema;
+        if (schema.getTypes().size() == 2
+            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(1);
+        } else if (schema.getTypes().size() == 2
+            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(0);
+        } else if (schema.getTypes().size() == 1) {
+          actualSchema = schema.getTypes().get(0);
+        } else {
+          return "";
+        }
+
+        return convertAvroSchemaToFieldType(actualSchema);
+      case FIXED:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+          return "DECIMAL(" + decimalType.getPrecision() + ", " + 
decimalType.getScale() + ")";
+        }
+        // convert fixed size binary data to primitive byte arrays
+        return "VARBINARY(" + schema.getFixedSize() + ")";
+      case BYTES:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+          return "DECIMAL(" + decimalType.getPrecision() + ", " + decimalType 
+ ")";
+        }
+        return "BYTES";
+      case INT:
+        // logical date and time type
+        final org.apache.avro.LogicalType logicalType = 
schema.getLogicalType();
+        if (logicalType == LogicalTypes.date()) {
+          return "DATE";
+        } else if (logicalType == LogicalTypes.timeMillis()) {
+          return "TIME";
+        }
+        return "INT";
+      case LONG:
+        // logical timestamp type
+        if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+          return "TIMESTAMP";
+        } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()) {
+          return "TIMESTAMP_WITH_LOCAL_TIME_ZONE";
+        } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+          return "TIMESTAMP";
+        } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMicros()) {
+          return "TIMESTAMP_WITH_LOCAL_TIME_ZONE";
+        } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+          return "TIME";
+        } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+          return "TIME";
+        }
+        return "BIGINT";
+      case FLOAT:
+        return "FLOAT";
+      case DOUBLE:
+        return "DOUBLE";
+      case BOOLEAN:
+        return "BOOLEAN";
+      case NULL:
+        return "NULL";
+      default:
+        throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
+    }
+  }
+
+  public static class HoodiePartitionMetric {
+    private long totalBaseFileSizeInBytes;
+    private long totalLogFileSizeInBytes;
+    private long baseFileCount;
+    private long logFileCount;
+
+    public HoodiePartitionMetric(
+        int baseFileCount,
+        int logFileCount,
+        long totalBaseFileSizeInBytes,
+        long totalLogFileSizeInBytes) {
+      this.baseFileCount = baseFileCount;
+      this.logFileCount = logFileCount;
+      this.totalBaseFileSizeInBytes = totalBaseFileSizeInBytes;
+      this.totalLogFileSizeInBytes = totalLogFileSizeInBytes;
+    }
+
+    public long getTotalBaseFileSizeInBytes() {
+      return totalBaseFileSizeInBytes;
+    }
+
+    public long getTotalLogFileSizeInBytes() {
+      return totalLogFileSizeInBytes;
+    }
+
+    public long getBaseFileCount() {
+      return baseFileCount;
+    }
+
+    public long getLogFileCount() {
+      return logFileCount;
+    }
+  }
+
+  public static Map<String, HoodiePartitionMetric> statisticPartitionsMetric(
+      List<String> partitionPaths,
+      SyncableFileSystemView fileSystemView,
+      ExecutorService ioExecutors) {
+    Map<String, CompletableFuture<HoodiePartitionMetric>> futures = new 
HashMap<>();
+    for (String path : partitionPaths) {
+      CompletableFuture<HoodiePartitionMetric> f =
+          CompletableFuture.supplyAsync(
+              () -> getPartitionMetric(path, fileSystemView), ioExecutors);
+      futures.put(path, f);
+    }
+
+    Map<String, HoodiePartitionMetric> results = new HashMap<>();
+    for (String path : partitionPaths) {
+      CompletableFuture<HoodiePartitionMetric> future = futures.get(path);
+      try {
+        HoodiePartitionMetric metric = future.get();
+        results.put(path, metric);
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException("Error when statistic partition: " + path + 
" metrics", e);
+      }
+    }
+    return results;
+  }
+
+  private static HoodiePartitionMetric getPartitionMetric(
+      String partitionPath, SyncableFileSystemView fsView) {
+    List<FileSlice> latestSlices =
+        fsView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
+    // Total size of the metadata and count of base/log files
+    long totalBaseFileSizeInBytes = 0;
+    long totalLogFileSizeInBytes = 0;
+    int baseFileCount = 0;
+    int logFileCount = 0;
+
+    for (FileSlice slice : latestSlices) {
+      if (slice.getBaseFile().isPresent()) {
+        totalBaseFileSizeInBytes += 
slice.getBaseFile().get().getFileStatus().getLen();
+        ++baseFileCount;
+      }
+      Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
+      while (it.hasNext()) {
+        totalLogFileSizeInBytes += it.next().getFileSize();
+        ++logFileCount;
+      }
+    }
+
+    return new HoodiePartitionMetric(
+        baseFileCount, logFileCount, totalBaseFileSizeInBytes, 
totalLogFileSizeInBytes);
+  }
+}
diff --git a/amoro-core/pom.xml b/amoro-core/pom.xml
index 1c23b2a62..52c1483c2 100644
--- a/amoro-core/pom.xml
+++ b/amoro-core/pom.xml
@@ -59,6 +59,23 @@
             <artifactId>caffeine</artifactId>
         </dependency>
 
+        <!--  apache hudi dependencies -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.amoro</groupId>
             <artifactId>amoro-shade-zookeeper-3</artifactId>
@@ -140,6 +157,17 @@
             <artifactId>RoaringBitmap</artifactId>
         </dependency>
 
+        <!-- hive client dependencies -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-metastore</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-bundled-guava</artifactId>
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java 
b/amoro-core/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java
index 35d775502..55c639523 100644
--- a/amoro-core/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java
+++ b/amoro-core/src/main/java/org/apache/amoro/CommonUnifiedCatalog.java
@@ -109,7 +109,8 @@ public class CommonUnifiedCatalog implements UnifiedCatalog 
{
             TableFormat.MIXED_HIVE,
             TableFormat.MIXED_ICEBERG,
             TableFormat.ICEBERG,
-            TableFormat.PAIMON)
+            TableFormat.PAIMON,
+            TableFormat.HUDI)
         .map(
             formatCatalog -> {
               try {
@@ -135,7 +136,11 @@ public class CommonUnifiedCatalog implements 
UnifiedCatalog {
     }
     TableFormat[] formats =
         new TableFormat[] {
-          TableFormat.MIXED_HIVE, TableFormat.MIXED_ICEBERG, 
TableFormat.ICEBERG, TableFormat.PAIMON
+          TableFormat.MIXED_HIVE,
+          TableFormat.MIXED_ICEBERG,
+          TableFormat.ICEBERG,
+          TableFormat.PAIMON,
+          TableFormat.HUDI
         };
 
     Map<String, TableFormat> tableNameToFormat = Maps.newHashMap();
diff --git a/amoro-core/src/main/java/org/apache/amoro/TableFormat.java 
b/amoro-core/src/main/java/org/apache/amoro/TableFormat.java
index e566e6b30..31b94b146 100644
--- a/amoro-core/src/main/java/org/apache/amoro/TableFormat.java
+++ b/amoro-core/src/main/java/org/apache/amoro/TableFormat.java
@@ -27,7 +27,8 @@ public enum TableFormat {
   ICEBERG,
   MIXED_ICEBERG,
   MIXED_HIVE,
-  PAIMON;
+  PAIMON,
+  HUDI;
 
   public boolean in(TableFormat... tableFormats) {
     for (TableFormat tableFormat : tableFormats) {
diff --git a/amoro-core/src/main/java/org/apache/amoro/data/DataFileType.java 
b/amoro-core/src/main/java/org/apache/amoro/data/DataFileType.java
index c3b669a38..8a1a90a6a 100644
--- a/amoro-core/src/main/java/org/apache/amoro/data/DataFileType.java
+++ b/amoro-core/src/main/java/org/apache/amoro/data/DataFileType.java
@@ -38,7 +38,8 @@ public enum DataFileType {
   INSERT_FILE(1, "I"),
   EQ_DELETE_FILE(2, "ED"),
   POS_DELETE_FILE(3, "PD"),
-  ICEBERG_EQ_DELETE_FILE(4, "IED");
+  ICEBERG_EQ_DELETE_FILE(4, "IED"),
+  LOG_FILE(5, "L");
 
   private final int id;
 
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiCatalogFactory.java
 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiCatalogFactory.java
new file mode 100644
index 000000000..86537594a
--- /dev/null
+++ 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiCatalogFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.hudi;
+
+import org.apache.amoro.FormatCatalog;
+import org.apache.amoro.FormatCatalogFactory;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.table.TableMetaStore;
+
+import java.util.Map;
+
+/** Hudi format catalog factory */
+public class HudiCatalogFactory implements FormatCatalogFactory {
+  @Override
+  public FormatCatalog create(
+      String catalogName,
+      String metastoreType,
+      Map<String, String> properties,
+      TableMetaStore metaStore) {
+    if 
(CatalogMetaProperties.CATALOG_TYPE_HADOOP.equalsIgnoreCase(metastoreType)) {
+      return new HudiHadoopCatalog(catalogName, properties, metaStore);
+    } else if 
(CatalogMetaProperties.CATALOG_TYPE_HIVE.equalsIgnoreCase(metastoreType)) {
+      return new HudiHiveCatalog(catalogName, properties, metaStore);
+    }
+    throw new IllegalArgumentException(
+        "Un-supported metastore type:" + metastoreType + " for Hudi");
+  }
+
+  @Override
+  public TableFormat format() {
+    return TableFormat.HUDI;
+  }
+
+  @Override
+  public Map<String, String> convertCatalogProperties(
+      String catalogName, String metastoreType, Map<String, String> 
unifiedCatalogProperties) {
+    return unifiedCatalogProperties;
+  }
+}
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHadoopCatalog.java 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHadoopCatalog.java
new file mode 100644
index 000000000..2e696d3d2
--- /dev/null
+++ 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHadoopCatalog.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.hudi;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.DatabaseNotEmptyException;
+import org.apache.amoro.FormatCatalog;
+import org.apache.amoro.NoSuchTableException;
+import org.apache.amoro.properties.CatalogMetaProperties;
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.utils.MixedCatalogUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieJavaTable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Hudi catalog implement in hadoop filesystem. */
+public class HudiHadoopCatalog implements FormatCatalog {
+  private final TableMetaStore metaStore;
+  private final String catalog;
+  private final Map<String, String> properties;
+  private final Path warehouse;
+
+  protected HudiHadoopCatalog(
+      String catalog, Map<String, String> catalogProperties, TableMetaStore 
metaStore) {
+    this.catalog = catalog;
+    this.metaStore = metaStore;
+    this.properties =
+        catalogProperties == null
+            ? Collections.emptyMap()
+            : Collections.unmodifiableMap(catalogProperties);
+
+    Preconditions.checkArgument(
+        this.properties.containsKey(CatalogMetaProperties.KEY_WAREHOUSE),
+        "Lack required property: {}",
+        CatalogMetaProperties.KEY_WAREHOUSE);
+    String warehosue = 
this.properties.get(CatalogMetaProperties.KEY_WAREHOUSE);
+    this.warehouse = new Path(warehosue);
+  }
+
+  @Override
+  public List<String> listDatabases() {
+    return metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          FileStatus[] fileStatuses = fs.listStatus(warehouse);
+          if (fileStatuses == null || fileStatuses.length == 0) {
+            return Lists.newArrayList();
+          }
+          List<String> databases = Lists.newArrayList();
+          for (FileStatus s : fileStatuses) {
+            if (!s.isDirectory()) {
+              continue;
+            }
+            databases.add(s.getPath().getName());
+          }
+          return databases;
+        });
+  }
+
+  @Override
+  public boolean databaseExists(String database) {
+    return metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          return fs.exists(new Path(warehouse, database));
+        });
+  }
+
+  @Override
+  public boolean tableExists(String database, String table) {
+    try {
+      loadTableLocation(database, table);
+      return true;
+    } catch (NoSuchTableException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void createDatabase(String database) {
+    metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          fs.mkdirs(new Path(warehouse, database));
+          return null;
+        });
+  }
+
+  @Override
+  public void dropDatabase(String database) {
+    List<String> tables = listTables(database);
+    if (!tables.isEmpty()) {
+      throw new DatabaseNotEmptyException("Database: " + database + " is not 
empty");
+    }
+    metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          Path path = new Path(warehouse, database);
+          fs.delete(path, true);
+          return null;
+        });
+  }
+
+  @Override
+  public boolean dropTable(String database, String table, boolean purge) {
+    Path databasePath = new Path(warehouse, database);
+    Path tablePath = new Path(databasePath, table);
+    return metaStore.doAs(
+        () -> {
+          Path dropPath = new Path(tablePath, ".hoodie");
+          if (purge) {
+            dropPath = tablePath;
+          }
+          try {
+            FileSystem fs = fs();
+            return fs.delete(dropPath, true);
+          } catch (IOException e) {
+            return false;
+          }
+        });
+  }
+
+  @Override
+  public List<String> listTables(String database) {
+    return metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          Path databasePath = new Path(warehouse, database);
+          FileStatus[] items = fs.listStatus(databasePath);
+          if (items == null || items.length == 0) {
+            return Lists.newArrayList();
+          }
+          List<String> hoodieTables = Lists.newArrayList();
+          for (FileStatus fileStatus : items) {
+            if (fileStatus.isDirectory()) {
+              Path tablePath = fileStatus.getPath();
+              if (isHoodieTableBase(fs, tablePath)) {
+                hoodieTables.add(tablePath.getName());
+              }
+            }
+          }
+          return hoodieTables;
+        });
+  }
+
+  @Override
+  public AmoroTable<?> loadTable(String database, String table) {
+    String tableLocation = loadTableLocation(database, table);
+    HoodieJavaEngineContext context = new 
HoodieJavaEngineContext(metaStore.getConfiguration());
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(tableLocation).build();
+
+    return metaStore.doAs(
+        () -> {
+          HoodieJavaTable hoodieTable = HoodieJavaTable.create(config, 
context);
+          TableIdentifier identifier = TableIdentifier.of(catalog, database, 
table);
+          Map<String, String> tableProperties =
+              hoodieTable.getMetaClient().getTableConfig().propsMap();
+          Map<String, String> properties =
+              MixedCatalogUtil.mergeCatalogPropertiesToTable(tableProperties, 
this.properties);
+          return new HudiTable(identifier, hoodieTable, properties);
+        });
+  }
+
+  private String loadTableLocation(String database, String table) {
+    Path databasePath = new Path(warehouse, database);
+    Path tablePath = new Path(databasePath, table);
+    return metaStore.doAs(
+        () -> {
+          FileSystem fs = fs();
+          if (isHoodieTableBase(fs, tablePath)) {
+            return tablePath.toString();
+          }
+          throw new NoSuchTableException(database + "." + table + " is not 
exists");
+        });
+  }
+
+  private boolean isHoodieTableBase(FileSystem fs, Path path) throws 
IOException {
+    try {
+      Path metadataPath = new Path(path, ".hoodie");
+      FileStatus status = fs.getFileStatus(metadataPath);
+      return status.isDirectory();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  private FileSystem fs() {
+    return FSUtils.getFs(warehouse, metaStore.getConfiguration());
+  }
+}
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHiveCatalog.java 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHiveCatalog.java
new file mode 100644
index 000000000..ba2a40302
--- /dev/null
+++ 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiHiveCatalog.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.hudi;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.FormatCatalog;
+import org.apache.amoro.NoSuchDatabaseException;
+import org.apache.amoro.hive.CachedHiveClientPool;
+import org.apache.amoro.hive.HMSClient;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.amoro.table.TableMetaStore;
+import org.apache.amoro.utils.MixedCatalogUtil;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class HudiHiveCatalog implements FormatCatalog {
+  private final TableMetaStore metaStore;
+  private final String catalog;
+  private final Map<String, String> properties;
+  private final CachedHiveClientPool hiveClientPool;
+
+  protected HudiHiveCatalog(
+      String catalog, Map<String, String> catalogProperties, TableMetaStore 
metaStore) {
+    this.catalog = catalog;
+    this.metaStore = metaStore;
+    this.properties =
+        catalogProperties == null
+            ? Collections.emptyMap()
+            : Collections.unmodifiableMap(catalogProperties);
+    this.hiveClientPool = new CachedHiveClientPool(metaStore, 
catalogProperties);
+  }
+
+  @Override
+  public List<String> listDatabases() {
+    try {
+      return hiveClientPool.run(HMSClient::getAllDatabases);
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to fetch database from HMS", e);
+    }
+  }
+
+  @Override
+  public boolean databaseExists(String database) {
+    try {
+      hiveClientPool.run(client -> client.getDatabase(database));
+      return true;
+    } catch (NoSuchObjectException e) {
+      return false;
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to fetch database from HMS", e);
+    }
+  }
+
+  @Override
+  public boolean tableExists(String database, String table) {
+    return loadHoodieHiveTable(database, table).isPresent();
+  }
+
+  @Override
+  public void createDatabase(String database) {
+    try {
+      Database hiveDatabase = new Database();
+      hiveDatabase.setName(database);
+      hiveClientPool.run(
+          client -> {
+            client.createDatabase(hiveDatabase);
+            return null;
+          });
+    } catch (AlreadyExistsException e) {
+      throw new org.apache.amoro.AlreadyExistsException("Database:" + database 
+ " already exists");
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to create database from HMS", e);
+    }
+  }
+
+  @Override
+  public void dropDatabase(String database) {
+    try {
+      hiveClientPool.run(
+          client -> {
+            client.dropDatabase(database, false, false, false);
+            return null;
+          });
+    } catch (NoSuchObjectException e) {
+      // pass
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to drop database from HMS", e);
+    }
+  }
+
+  @Override
+  public boolean dropTable(String database, String table, boolean purge) {
+    Optional<Table> optHiveTable = loadHoodieHiveTable(database, table);
+    if (!optHiveTable.isPresent()) {
+      return false;
+    }
+    Table hiveTable = optHiveTable.get();
+    String location = hiveTable.getSd().getLocation();
+    try {
+      hiveClientPool.run(
+          client -> {
+            client.dropTable(database, table, purge, true);
+            return null;
+          });
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to drop database from HMS", e);
+    }
+    if (purge) {
+      metaStore.doAs(
+          () -> {
+            Path tableLocation = new Path(location);
+            FileSystem fs = FSUtils.getFs(tableLocation, 
metaStore.getConfiguration());
+            fs.delete(tableLocation, true);
+            return null;
+          });
+    }
+    return true;
+  }
+
+  @Override
+  public List<String> listTables(String database) {
+    List<String> hiveTableLists;
+    try {
+      hiveTableLists = hiveClientPool.run(client -> 
client.getAllTables(database));
+    } catch (NoSuchObjectException e) {
+      throw new NoSuchDatabaseException("Database: " + database + " dose not 
exists.");
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to drop database from HMS", e);
+    }
+    return hiveTableLists.stream()
+        .filter(table -> loadHoodieHiveTable(database, table).isPresent())
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public AmoroTable<?> loadTable(String database, String table) {
+    Optional<Table> hoodieHiveTable = loadHoodieHiveTable(database, table);
+    if (!hoodieHiveTable.isPresent()) {
+      throw new NoSuchDatabaseException(
+          "Hoodie table: " + database + "." + table + " dose not exists");
+    }
+    Table hiveTable = hoodieHiveTable.get();
+    String tableLocation = hiveTable.getSd().getLocation();
+    HoodieJavaEngineContext context = new 
HoodieJavaEngineContext(metaStore.getConfiguration());
+    HoodieWriteConfig config =
+        HoodieWriteConfig.newBuilder()
+            .withPath(tableLocation)
+            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+            .build();
+
+    return metaStore.doAs(
+        () -> {
+          HoodieJavaTable hoodieTable = HoodieJavaTable.create(config, 
context);
+          TableIdentifier identifier = TableIdentifier.of(catalog, database, 
table);
+          Map<String, String> tableProperties = hiveTable.getParameters();
+          tableProperties = filterEngineProperties(tableProperties);
+          Map<String, String> properties =
+              MixedCatalogUtil.mergeCatalogPropertiesToTable(tableProperties, 
this.properties);
+          return new HudiTable(identifier, hoodieTable, properties);
+        });
+  }
+
+  private Optional<Table> loadHoodieHiveTable(String database, String table) {
+    try {
+      Table hiveTable = hiveClientPool.run(client -> client.getTable(database, 
table));
+      if (isHoodieTable(hiveTable)) {
+        return Optional.of(hiveTable);
+      } else {
+        return Optional.empty();
+      }
+    } catch (NoSuchObjectException e) {
+      return Optional.empty();
+    } catch (TException | InterruptedException e) {
+      throw new RuntimeException("Failed to fetch database from HMS", e);
+    }
+  }
+
+  static final String SPARK_SOURCE_PROVIDER = "spark.sql.sources.provider";
+  static final String FLINK_CONNECTOR = "connector";
+
+  private boolean isHoodieTable(Table table) {
+
+    return 
"hudi".equalsIgnoreCase(table.getParameters().getOrDefault(SPARK_SOURCE_PROVIDER,
 ""))
+        || 
"hudi".equalsIgnoreCase(table.getParameters().getOrDefault(FLINK_CONNECTOR, 
""));
+  }
+
+  private Map<String, String> filterEngineProperties(Map<String, String> 
properties) {
+    Set<String> enginePropertyKeys =
+        Sets.newHashSet(
+            "transient_lastDdlTime",
+            "spark.sql.sources.schema.part.",
+            "spark.sql.sources.schema.partCol.",
+            "spark.sql.sources.schema.numPartCols",
+            "spark.sql.sources.schema.numParts",
+            "spark.sql.sources.provider",
+            "spark.sql.create.version");
+    Map<String, String> filteredProperties = new HashMap<>();
+    properties.forEach(
+        (k, v) -> {
+          boolean exists = enginePropertyKeys.stream().anyMatch(k::startsWith);
+          if (!exists) {
+            filteredProperties.put(k, v);
+          }
+        });
+    return filteredProperties;
+  }
+}
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiSnapshot.java 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiSnapshot.java
new file mode 100644
index 000000000..789bc85b5
--- /dev/null
+++ b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiSnapshot.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.hudi;
+
+import java.util.Map;
+
+public class HudiSnapshot {
+
+  private final String snapshotId;
+  private final long commitTimestamp;
+
+  private final String operationType;
+  private final String operation;
+
+  private final int totalFileCount;
+
+  private final int baseFileCount;
+  private final int logFileCount;
+
+  private final long totalFileSize;
+  private final long totalRecordCount;
+
+  private final Map<String, String> summary;
+
+  public HudiSnapshot(
+      String snapshotId,
+      long commitTimestamp,
+      String operationType,
+      String operation,
+      int totalFileCount,
+      int totalDataFileCount,
+      int totalLogFileCount,
+      long totalFileSize,
+      long totalRecordCount,
+      Map<String, String> summary) {
+    this.snapshotId = snapshotId;
+    this.commitTimestamp = commitTimestamp;
+    this.operationType = operationType;
+    this.operation = operation;
+    this.totalFileCount = totalFileCount;
+    this.baseFileCount = totalDataFileCount;
+    this.logFileCount = totalLogFileCount;
+    this.totalFileSize = totalFileSize;
+    this.totalRecordCount = totalRecordCount;
+    this.summary = summary;
+  }
+
+  public String getSnapshotId() {
+    return snapshotId;
+  }
+
+  public long getCommitTimestamp() {
+    return commitTimestamp;
+  }
+
+  public String getOperationType() {
+    return operationType;
+  }
+
+  public String getOperation() {
+    return operation;
+  }
+
+  public int getTotalFileCount() {
+    return totalFileCount;
+  }
+
+  public int getBaseFileCount() {
+    return baseFileCount;
+  }
+
+  public int getLogFileCount() {
+    return logFileCount;
+  }
+
+  public long getTotalFileSize() {
+    return totalFileSize;
+  }
+
+  public long getTotalRecordCount() {
+    return totalRecordCount;
+  }
+
+  public Map<String, String> getSummary() {
+    return summary;
+  }
+}
diff --git 
a/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiTable.java 
b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiTable.java
new file mode 100644
index 000000000..e87704d00
--- /dev/null
+++ b/amoro-core/src/main/java/org/apache/amoro/formats/hudi/HudiTable.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.formats.hudi;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INDEXING_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LOG_COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.TableSnapshot;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
+import org.apache.amoro.table.TableIdentifier;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieJavaTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HudiTable implements AmoroTable<HoodieJavaTable> {
+  private static final Logger LOG = LoggerFactory.getLogger(HudiTable.class);
+  private final TableIdentifier identifier;
+  private final HoodieJavaTable hoodieTable;
+  private final Map<String, String> tableProperties;
+  private transient List<String> partitions;
+
+  private transient Map<String, HudiSnapshot> snapshots;
+
+  public HudiTable(
+      TableIdentifier identifier,
+      HoodieJavaTable hoodieTable,
+      Map<String, String> tableProperties) {
+    this.identifier = identifier;
+    this.hoodieTable = hoodieTable;
+    this.tableProperties =
+        tableProperties == null
+            ? Collections.emptyMap()
+            : Collections.unmodifiableMap(tableProperties);
+  }
+
+  @Override
+  public TableIdentifier id() {
+    return identifier;
+  }
+
+  @Override
+  public TableFormat format() {
+    return TableFormat.HUDI;
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return tableProperties;
+  }
+
+  @Override
+  public HoodieJavaTable originalTable() {
+    return hoodieTable;
+  }
+
+  @Override
+  public TableSnapshot currentSnapshot() {
+    throw new IllegalStateException("The method is not implement.");
+  }
+
+  public List<HudiSnapshot> getSnapshotList(Executor ioExecutors) {
+    return new ArrayList<>(getSnapshots(ioExecutors).values());
+  }
+
+  public synchronized List<String> getPartitions() {
+    ensurePartitionLoaded();
+    return new ArrayList<>(this.partitions);
+  }
+
+  private synchronized Map<String, HudiSnapshot> getSnapshots(Executor 
ioExecutors) {
+    if (snapshots == null) {
+      ensurePartitionLoaded();
+      this.snapshots = constructSnapshots(ioExecutors);
+    }
+    return snapshots;
+  }
+
+  private static final Set<String> OPTIMIZING_INSTANT_TYPES =
+      Sets.newHashSet(
+          CLEAN_ACTION,
+          COMPACTION_ACTION,
+          REPLACE_COMMIT_ACTION,
+          INDEXING_ACTION,
+          LOG_COMPACTION_ACTION);
+  private static final Set<String> WRITE_INSTANT_TYPES =
+      Sets.newHashSet(
+          COMMIT_ACTION,
+          DELTA_COMMIT_ACTION,
+          REPLACE_COMMIT_ACTION,
+          SAVEPOINT_ACTION,
+          ROLLBACK_ACTION);
+
+  private static final Set<String> OPTIMIZING_HOODIE_OPERATION =
+      Sets.newHashSet(
+          WriteOperationType.CLUSTER.value(),
+          WriteOperationType.COMPACT.value(),
+          WriteOperationType.LOG_COMPACT.value());
+
+  private Map<String, HudiSnapshot> constructSnapshots(Executor ioExecutors) {
+    HoodieActiveTimeline timeline = hoodieTable.getActiveTimeline();
+    List<HoodieInstant> instants = 
timeline.filterCompletedInstants().getInstants();
+
+    return instants.stream()
+        .map(i -> CompletableFuture.supplyAsync(() -> constructSnapshot(i, 
timeline), ioExecutors))
+        .map(CompletableFuture::join)
+        .collect(Collectors.toMap(HudiSnapshot::getSnapshotId, 
Function.identity()));
+  }
+
+  private HudiSnapshot constructSnapshot(HoodieInstant instant, HoodieTimeline 
timeline) {
+    String snapshotId = instant.getTimestamp();
+    long timestamp = parseHoodieCommitTime(instant.getTimestamp());
+    String hoodieOperationType = null;
+    Map<String, String> summary = Collections.emptyMap();
+    Option<byte[]> optDetail = timeline.getInstantDetails(instant);
+    if (optDetail.isPresent()) {
+      byte[] detail = optDetail.get();
+      try {
+        HoodieCommitMetadata metadata =
+            HoodieCommitMetadata.fromBytes(detail, HoodieCommitMetadata.class);
+        hoodieOperationType = metadata.getOperationType().value();
+        summary = getSnapshotSummary(metadata);
+      } catch (IOException e) {
+        LOG.error("Error when fetch hoodie instant metadata", e);
+      }
+    }
+    SyncableFileSystemView fileSystemView = hoodieTable.getHoodieView();
+    AtomicInteger totalFileCount = new AtomicInteger(0);
+    AtomicInteger baseFileCount = new AtomicInteger(0);
+    AtomicInteger logFileCount = new AtomicInteger(0);
+    AtomicLong totalFileSize = new AtomicLong(0);
+    for (String partition : this.partitions) {
+      Stream<FileSlice> fsStream =
+          fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partition, 
instant.getTimestamp());
+      fsStream.forEach(
+          fs -> {
+            int fsBaseCount = fs.getBaseFile().map(f -> 1).orElse(0);
+            int fsLogFileCount = (int) fs.getLogFiles().count();
+
+            baseFileCount.addAndGet(fsBaseCount);
+            logFileCount.addAndGet(fsLogFileCount);
+            totalFileCount.addAndGet(fsBaseCount);
+            totalFileCount.addAndGet(fsLogFileCount);
+            totalFileSize.addAndGet(fs.getTotalFileSize());
+          });
+    }
+
+    String operation = instant.getAction().toUpperCase();
+    if (hoodieOperationType != null) {
+      operation = hoodieOperationType;
+    }
+    String operationType = null;
+    if (WRITE_INSTANT_TYPES.contains(instant.getAction())) {
+      operationType = "NON_OPTIMIZING";
+    } else if (OPTIMIZING_INSTANT_TYPES.contains(instant.getAction())) {
+      operationType = "OPTIMIZING";
+    }
+
+    if (OPTIMIZING_HOODIE_OPERATION.contains(hoodieOperationType)) {
+      operationType = "OPTIMIZING";
+    }
+
+    return new HudiSnapshot(
+        snapshotId,
+        timestamp,
+        operationType,
+        operation,
+        totalFileCount.get(),
+        baseFileCount.get(),
+        logFileCount.get(),
+        totalFileSize.get(),
+        0L,
+        summary);
+  }
+
+  private Map<String, String> getSnapshotSummary(HoodieCommitMetadata 
metadata) {
+    Map<String, String> summary = new HashMap<>();
+    long totalWriteBytes = 0;
+    long recordWrites = 0;
+    long recordDeletes = 0;
+    long recordInserts = 0;
+    long recordUpdates = 0;
+    Set<String> partitions = new HashSet<>();
+    Set<String> files = new HashSet<>();
+    Map<String, List<HoodieWriteStat>> hoodieWriteStats = 
metadata.getPartitionToWriteStats();
+    for (String partition : hoodieWriteStats.keySet()) {
+      List<HoodieWriteStat> ptWriteStat = hoodieWriteStats.get(partition);
+      partitions.add(partition);
+      for (HoodieWriteStat writeStat : ptWriteStat) {
+        totalWriteBytes += writeStat.getTotalWriteBytes();
+        recordWrites += writeStat.getNumWrites();
+        recordDeletes += writeStat.getNumDeletes();
+        recordInserts += writeStat.getNumInserts();
+        recordUpdates += writeStat.getNumUpdateWrites();
+        files.add(writeStat.getPath());
+      }
+    }
+    summary.put("write-bytes", String.valueOf(totalWriteBytes));
+    summary.put("write-records", String.valueOf(recordWrites));
+    summary.put("delete-records", String.valueOf(recordDeletes));
+    summary.put("insert-records", String.valueOf(recordInserts));
+    summary.put("update-records", String.valueOf(recordUpdates));
+    summary.put("write-partitions", String.valueOf(partitions.size()));
+    summary.put("write-files", String.valueOf(files.size()));
+    return summary;
+  }
+
+  private long parseHoodieCommitTime(String commitTime) {
+    try {
+      Date date = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(commitTime);
+      return date.getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException("Error when parse timestamp:" + commitTime, 
e);
+    }
+  }
+
+  private List<String> ensurePartitionLoaded() {
+    if (partitions == null) {
+      HoodieTableMetadata hoodieTableMetadata = hoodieTable.getMetadata();
+      try {
+        this.partitions = hoodieTableMetadata.getAllPartitionPaths();
+      } catch (IOException e) {
+        throw new RuntimeException("Error when load partitions for table: " + 
id(), e);
+      }
+    }
+    return partitions;
+  }
+}
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/AuthenticatedHiveClientPool.java
 
b/amoro-core/src/main/java/org/apache/amoro/hive/AuthenticatedHiveClientPool.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/AuthenticatedHiveClientPool.java
rename to 
amoro-core/src/main/java/org/apache/amoro/hive/AuthenticatedHiveClientPool.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/CachedHiveClientPool.java
 b/amoro-core/src/main/java/org/apache/amoro/hive/CachedHiveClientPool.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/CachedHiveClientPool.java
rename to 
amoro-core/src/main/java/org/apache/amoro/hive/CachedHiveClientPool.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClient.java
 b/amoro-core/src/main/java/org/apache/amoro/hive/HMSClient.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClient.java
rename to amoro-core/src/main/java/org/apache/amoro/hive/HMSClient.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClientImpl.java
 b/amoro-core/src/main/java/org/apache/amoro/hive/HMSClientImpl.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClientImpl.java
rename to amoro-core/src/main/java/org/apache/amoro/hive/HMSClientImpl.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClientPool.java
 b/amoro-core/src/main/java/org/apache/amoro/hive/HMSClientPool.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/amoro/hive/HMSClientPool.java
rename to amoro-core/src/main/java/org/apache/amoro/hive/HMSClientPool.java
diff --git 
a/amoro-core/src/main/resources/META-INF/services/org.apache.amoro.FormatCatalogFactory
 
b/amoro-core/src/main/resources/META-INF/services/org.apache.amoro.FormatCatalogFactory
index 09b74768f..04900c921 100644
--- 
a/amoro-core/src/main/resources/META-INF/services/org.apache.amoro.FormatCatalogFactory
+++ 
b/amoro-core/src/main/resources/META-INF/services/org.apache.amoro.FormatCatalogFactory
@@ -19,4 +19,5 @@
 org.apache.amoro.formats.iceberg.IcebergCatalogFactory
 org.apache.amoro.formats.paimon.PaimonCatalogFactory
 org.apache.amoro.formats.mixed.MixedIcebergCatalogFactory
-org.apache.amoro.formats.mixed.MixedHiveCatalogFactory
\ No newline at end of file
+org.apache.amoro.formats.mixed.MixedHiveCatalogFactory
+org.apache.amoro.formats.hudi.HudiCatalogFactory
\ No newline at end of file
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
 
b/amoro-mixed-format/amoro-mixed-format-hive/src/test/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
rename to 
amoro-mixed-format/amoro-mixed-format-hive/src/test/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionExpressionForMetastore.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java
 
b/amoro-mixed-format/amoro-mixed-format-hive/src/test/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java
similarity index 100%
rename from 
amoro-mixed-format/amoro-mixed-format-hive/src/main/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java
rename to 
amoro-mixed-format/amoro-mixed-format-hive/src/test/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationPreEventListener.java
diff --git 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
index 1c06a9c8f..277225eaa 100644
--- 
a/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
+++ 
b/amoro-mixed-format/amoro-mixed-format-spark/amoro-mixed-format-spark-3-common/src/test/java/org/apache/amoro/spark/test/SparkTestContext.java
@@ -38,7 +38,9 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.thrift.TException;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Arrays;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public class SparkTestContext {
 
@@ -119,6 +121,9 @@ public class SparkTestContext {
     }
     HiveConf hiveConf = hms.getHiveConf();
     for (TableFormat format : TableFormat.values()) {
+      if (format == TableFormat.HUDI) {
+        continue;
+      }
       // create catalog for all formats in AMS with hive metastore.
       CatalogMeta hiveCatalogMeta =
           HiveCatalogTestHelper.build(hiveConf, format)
@@ -130,7 +135,13 @@ public class SparkTestContext {
     CatalogMeta allFormats =
         HiveCatalogTestHelper.build(hiveConf, TableFormat.values()[0])
             .buildCatalogMeta(warehouse.getRoot().getAbsolutePath());
-    String formats = Joiner.on(',').join(TableFormat.values());
+    // spark unified catalog doesn't support hudi.
+    String formats =
+        Joiner.on(',')
+            .join(
+                Arrays.stream(TableFormat.values())
+                    .filter(f -> TableFormat.HUDI != f)
+                    .collect(Collectors.toList()));
     allFormats.putToCatalogProperties(CatalogMetaProperties.TABLE_FORMATS, 
formats);
     allFormats.setCatalogName(AMS_ALL_FORMAT_CATALOG_NAME);
     ams.getAmsHandler().createCatalog(allFormats);
diff --git a/pom.xml b/pom.xml
index 09dd2c077..75285ff3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
         <curator.version>5.6.0</curator.version>
         <mockito.version>4.11.0</mockito.version>
         <parquet-avro.version>1.13.1</parquet-avro.version>
+        <mysql-jdbc.version>8.0.33</mysql-jdbc.version>
         <orc-core.version>1.8.3</orc-core.version>
         <awssdk.version>2.24.12</awssdk.version>
         <terminal.spark.version>3.3.2</terminal.spark.version>
@@ -140,6 +141,7 @@
         <amoro-shade-thrift.version>0.20.0</amoro-shade-thrift.version>
         <annotation-api.version>1.3.2</annotation-api.version>
         <guava.version>32.1.1-jre</guava.version>
+        <hudi.version>0.14.1</hudi.version>
 
         <rocksdb-dependency-scope>compile</rocksdb-dependency-scope>
         <lucene-dependency-scope>compile</lucene-dependency-scope>
@@ -577,6 +579,12 @@
                 <version>${derby-jdbc.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>mysql</groupId>
+                <artifactId>mysql-connector-java</artifactId>
+                <version>${mysql-jdbc.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-dbcp2</artifactId>
@@ -812,6 +820,13 @@
                 <version>${annotation-api.version}</version>
             </dependency>
 
+            <!--  apache hudi dependencies management -->
+            <dependency>
+                <groupId>org.apache.hudi</groupId>
+                <artifactId>hudi-java-client</artifactId>
+                <version>${hudi.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>

Reply via email to