This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new fdb07abfb [#4952] feat(hudi-catalog): add implementation of HMSBackend 
for Hudi catalog (#4942)
fdb07abfb is described below

commit fdb07abfba0676a03c7378ef1f7382471050ad38
Author: mchades <[email protected]>
AuthorDate: Fri Oct 11 17:04:44 2024 +0800

    [#4952] feat(hudi-catalog): add implementation of HMSBackend for Hudi 
catalog (#4942)
    
    ### What changes were proposed in this pull request?
    
    support read operations for Hudi catalog HMS backend
    
    ### Why are the changes needed?
    
    Fix: #4952
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    UTs added
---
 catalogs/catalog-lakehouse-hudi/build.gradle.kts   |  99 +++++++++++
 .../lakehouse/hudi/HudiCatalogOperations.java      |   5 +-
 .../hudi/HudiCatalogPropertiesMetadata.java        |  33 +++-
 .../catalog/lakehouse/hudi/HudiColumn.java         |  61 +++++++
 .../catalog/lakehouse/hudi/HudiSchema.java         |   5 +
 .../hudi/HudiSchemaPropertiesMetadata.java         |  17 +-
 .../catalog/lakehouse/hudi/HudiTable.java          |   5 +
 .../hudi/HudiTablePropertiesMetadata.java          |  36 +++-
 .../lakehouse/hudi/backend/HudiCatalogBackend.java |  10 +-
 .../lakehouse/hudi/backend/hms/HudiHMSBackend.java |   3 +-
 .../hudi/backend/hms/HudiHMSBackendOps.java        | 150 ++++++++++++++++-
 .../lakehouse/hudi/backend/hms/HudiHMSSchema.java  |  20 ++-
 .../lakehouse/hudi/backend/hms/HudiHMSTable.java   |  30 +++-
 .../catalog/lakehouse/hudi/utils/CatalogUtils.java |  18 +-
 .../catalog/lakehouse/hudi/TestHudiCatalog.java    |  62 +++++++
 .../lakehouse/hudi/TestHudiCatalogOperations.java  | 127 ++++++++++++++
 .../catalog/lakehouse/hudi/TestHudiSchema.java}    |  24 +--
 .../catalog/lakehouse/hudi/TestHudiTable.java}     |  28 +---
 .../hudi/backend/hms/TestHudiHMSBackend.java}      |  26 ++-
 .../hudi/backend/hms/TestHudiHMSBackendOps.java    | 184 +++++++++++++++++++++
 .../lakehouse/hudi/ops/InMemoryBackendOps.java}    |  80 +++++++--
 .../lakehouse/hudi/utils/TestCatalogUtils.java}    |  22 ++-
 .../hive/converter/HiveTableConverter.java         |   2 +-
 23 files changed, 935 insertions(+), 112 deletions(-)

diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts 
b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
index 78ff2f8b1..eef90f029 100644
--- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts
+++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts
@@ -24,18 +24,117 @@ plugins {
   id("idea")
 }
 
+val scalaVersion: String = project.properties["scalaVersion"] as? String ?: 
extra["defaultScalaVersion"].toString()
+val fullSparkVersion: String = libs.versions.spark34.get()
+val sparkVersion = fullSparkVersion.split(".").take(2).joinToString(".")
+
 dependencies {
   implementation(project(":api")) {
     exclude(group = "*")
   }
+  implementation(project(":common")) {
+    exclude(group = "*")
+  }
+  implementation(project(":catalogs:hive-metastore-common"))
   implementation(project(":core")) {
     exclude(group = "*")
   }
 
   implementation(libs.guava)
+  implementation(libs.hive2.exec) {
+    artifact {
+      classifier = "core"
+    }
+    exclude("com.google.code.findbugs", "jsr305")
+    exclude("com.google.protobuf")
+    exclude("org.apache.avro")
+    exclude("org.apache.ant")
+    exclude("org.apache.calcite")
+    exclude("org.apache.calcite.avatica")
+    exclude("org.apache.curator")
+    exclude("org.apache.derby")
+    exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
+    exclude("org.apache.hive", "hive-llap-tez")
+    exclude("org.apache.hive", "hive-vector-code-gen")
+    exclude("org.apache.ivy")
+    exclude("org.apache.logging.log4j")
+    exclude("org.apache.zookeeper")
+    exclude("org.codehaus.groovy", "groovy-all")
+    exclude("org.datanucleus", "datanucleus-core")
+    exclude("org.eclipse.jetty.aggregate", "jetty-all")
+    exclude("org.eclipse.jetty.orbit", "javax.servlet")
+    exclude("org.openjdk.jol")
+    exclude("org.pentaho")
+    exclude("org.slf4j")
+  }
   implementation(libs.hive2.metastore) {
+    exclude("ant")
+    exclude("co.cask.tephra")
+    exclude("com.github.joshelser")
+    exclude("com.google.code.findbugs", "jsr305")
+    exclude("com.google.code.findbugs", "sr305")
+    exclude("com.tdunning", "json")
+    exclude("com.zaxxer", "HikariCP")
+    exclude("io.dropwizard.metricss")
+    exclude("javax.transaction", "transaction-api")
+    exclude("org.apache.ant")
+    exclude("org.apache.avro")
+    exclude("org.apache.curator")
+    exclude("org.apache.derby")
+    exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
+    exclude("org.apache.hbase")
+    exclude("org.apache.logging.log4j")
+    exclude("org.apache.parquet", "parquet-hadoop-bundle")
+    exclude("org.apache.zookeeper")
+    exclude("org.datanucleus")
+    exclude("org.eclipse.jetty.aggregate", "jetty-all")
+    exclude("org.eclipse.jetty.orbit", "javax.servlet")
+    exclude("org.openjdk.jol")
+    exclude("org.slf4j")
+  }
+  implementation(libs.hadoop2.common) {
     exclude("*")
   }
   implementation(libs.slf4j.api)
   implementation(libs.thrift)
+
+  compileOnly(libs.lombok)
+
+  annotationProcessor(libs.lombok)
+
+  testImplementation(project(":catalogs:hive-metastore-common", 
"testArtifacts"))
+
+  testImplementation(libs.bundles.log4j)
+  testImplementation(libs.commons.collections3)
+  testImplementation(libs.commons.configuration1)
+  testImplementation(libs.datanucleus.core)
+  testImplementation(libs.datanucleus.api.jdo)
+  testImplementation(libs.datanucleus.rdbms)
+  testImplementation(libs.datanucleus.jdo)
+  testImplementation(libs.derby)
+  testImplementation(libs.hadoop2.auth) {
+    exclude("*")
+  }
+  testImplementation(libs.hadoop2.mapreduce.client.core) {
+    exclude("*")
+  }
+  testImplementation(libs.htrace.core4)
+  testImplementation(libs.junit.jupiter.api)
+  testImplementation(libs.woodstox.core)
+  
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$fullSparkVersion")
 {
+    exclude("org.apache.hadoop")
+    exclude("io.dropwizard.metrics")
+    exclude("com.fasterxml.jackson.core")
+    exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.12")
+  }
+  
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$fullSparkVersion")
 {
+    exclude("org.apache.avro")
+    exclude("org.apache.hadoop")
+    exclude("org.apache.zookeeper")
+    exclude("io.dropwizard.metrics")
+    exclude("org.rocksdb")
+  }
+
+  
testRuntimeOnly("org.apache.hudi:hudi-spark$sparkVersion-bundle_$scalaVersion:0.15.0")
+  testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
index d242cfeef..c2b68d11d 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Map;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
@@ -54,7 +55,7 @@ public class HudiCatalogOperations implements 
CatalogOperations, SupportsSchemas
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HudiCatalogOperations.class);
 
-  private HudiCatalogBackendOps hudiCatalogBackendOps;
+  @VisibleForTesting HudiCatalogBackendOps hudiCatalogBackendOps;
 
   /**
    * Load the Hudi Catalog Backend and initialize the Hudi Catalog Operations.
@@ -69,7 +70,7 @@ public class HudiCatalogOperations implements 
CatalogOperations, SupportsSchemas
       Map<String, String> config, CatalogInfo info, HasPropertyMetadata 
propertiesMetadata)
       throws RuntimeException {
     HudiCatalogBackend hudiCatalogBackend = 
CatalogUtils.loadHudiCatalogBackend(config);
-    hudiCatalogBackendOps = hudiCatalogBackend.catalogOps();
+    hudiCatalogBackendOps = hudiCatalogBackend.backendOps();
   }
 
   /**
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java
index 8325dcc0d..1f5142a28 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java
@@ -18,14 +18,43 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi;
 
-import java.util.Collections;
+import static 
org.apache.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType;
 import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.gravitino.hive.ClientPropertiesMetadata;
 
 public class HudiCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetadata {
+  public static final String CATALOG_BACKEND = "catalog-backend";
+  public static final String URI = "uri";
+  private static final ClientPropertiesMetadata CLIENT_PROPERTIES_METADATA =
+      new ClientPropertiesMetadata();
+
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              CATALOG_BACKEND,
+              enumImmutablePropertyEntry(
+                  CATALOG_BACKEND,
+                  "Hudi catalog type choose properties",
+                  true /* required */,
+                  BackendType.class,
+                  null /* defaultValue */,
+                  false /* hidden */,
+                  false /* reserved */))
+          .put(
+              URI,
+              stringRequiredPropertyEntry(
+                  URI, "Hudi catalog uri config", false /* immutable */, false 
/* hidden */))
+          .putAll(CLIENT_PROPERTIES_METADATA.propertyEntries())
+          .build();
+
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return Collections.emptyMap();
+    return PROPERTIES_METADATA;
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java
new file mode 100644
index 000000000..540561c4d
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi;
+
+import lombok.EqualsAndHashCode;
+import org.apache.gravitino.connector.BaseColumn;
+
+/** A class representing a column in a Hudi table. */
+@EqualsAndHashCode(callSuper = true)
+public class HudiColumn extends BaseColumn {
+  /**
+   * Creates a new instance of {@link Builder}.
+   *
+   * @return The new instance.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  private HudiColumn() {}
+
+  /** A builder class for constructing HudiColumn instances. */
+  public static class Builder extends BaseColumnBuilder<Builder, HudiColumn> {
+
+    /** Creates a new instance of {@link Builder}. */
+    private Builder() {}
+
+    /**
+     * Internal method to build a HudiColumn instance using the provided 
values.
+     *
+     * @return A new HudiColumn instance with the configured values.
+     */
+    @Override
+    protected HudiColumn internalBuild() {
+      HudiColumn hudiColumn = new HudiColumn();
+
+      hudiColumn.name = name;
+      hudiColumn.comment = comment;
+      hudiColumn.dataType = dataType;
+      hudiColumn.nullable = nullable;
+      hudiColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : 
defaultValue;
+      return hudiColumn;
+    }
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java
index 391ab3579..ebfddc9e2 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java
@@ -81,5 +81,10 @@ public abstract class HudiSchema<DATABASE> extends 
BaseSchema {
      * @return the HudiSchema
      */
     protected abstract HudiSchema<T> buildFromSchema(T schema);
+
+    @Override
+    public HudiSchema<T> build() {
+      return internalBuild();
+    }
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java
index c51fad43a..74be82305 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java
@@ -18,14 +18,27 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi;
 
-import java.util.Collections;
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
 public class HudiSchemaPropertiesMetadata extends BasePropertiesMetadata {
+  public static final String LOCATION = "location";
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(
+              LOCATION,
+              PropertyEntry.stringOptionalPropertyEntry(
+                  LOCATION,
+                  "The directory for Hudi dataset storage",
+                  false /* immutable */,
+                  null /* default value */,
+                  false /* hidden */))
+          .build();
+
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return Collections.emptyMap();
+    return PROPERTIES_METADATA;
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java
index 9f28a516a..5deb1e67f 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java
@@ -68,5 +68,10 @@ public abstract class HudiTable<TABLE> extends BaseTable {
      * @return the HudiTable
      */
     protected abstract HudiTable<T> buildFromTable(T backendTable);
+
+    @Override
+    public HudiTable<T> build() {
+      return internalBuild();
+    }
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java
index 3b87867c0..3a1c72ae5 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java
@@ -18,14 +18,46 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi;
 
-import java.util.Collections;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringImmutablePropertyEntry;
+import static 
org.apache.gravitino.connector.PropertyEntry.stringReservedPropertyEntry;
+
+import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.connector.BasePropertiesMetadata;
 import org.apache.gravitino.connector.PropertyEntry;
 
 public class HudiTablePropertiesMetadata extends BasePropertiesMetadata {
+  public static final String COMMENT = "comment";
+  public static final String LOCATION = "location";
+  public static final String INPUT_FORMAT = "input-format";
+  public static final String OUTPUT_FORMAT = "output-format";
+
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA =
+      ImmutableMap.<String, PropertyEntry<?>>builder()
+          .put(COMMENT, stringReservedPropertyEntry(COMMENT, "table comment", 
true /* hidden */))
+          .put(
+              LOCATION,
+              stringImmutablePropertyEntry(
+                  LOCATION,
+                  "The location for Hudi table",
+                  false /* required */,
+                  null /* default value */,
+                  false /* hidden */,
+                  false /* reserved */))
+          .put(
+              INPUT_FORMAT,
+              stringReservedPropertyEntry(
+                  INPUT_FORMAT,
+                  "Hudi table input format used to distinguish the table type",
+                  false /* hidden */))
+          .put(
+              OUTPUT_FORMAT,
+              stringReservedPropertyEntry(
+                  OUTPUT_FORMAT, "Hudi table output format", false /* hidden 
*/))
+          .build();
+
   @Override
   protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
-    return Collections.emptyMap();
+    return PROPERTIES_METADATA;
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java
index e257e6019..eb9fdf9c4 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java
@@ -26,20 +26,20 @@ public abstract class HudiCatalogBackend {
 
   private final BackendType backendType;
 
-  private final HudiCatalogBackendOps catalogOps;
+  private final HudiCatalogBackendOps backendOps;
 
   public abstract void initialize(Map<String, String> properties);
 
-  protected HudiCatalogBackend(BackendType backendType, HudiCatalogBackendOps 
catalogOps) {
+  protected HudiCatalogBackend(BackendType backendType, HudiCatalogBackendOps 
backendOps) {
     this.backendType = backendType;
-    this.catalogOps = catalogOps;
+    this.backendOps = backendOps;
   }
 
   public BackendType type() {
     return backendType;
   }
 
-  public HudiCatalogBackendOps catalogOps() {
-    return catalogOps;
+  public HudiCatalogBackendOps backendOps() {
+    return backendOps;
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
index dfc6228f2..1726d22d7 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
@@ -37,7 +37,6 @@ public class HudiHMSBackend extends HudiCatalogBackend {
 
   @Override
   public void initialize(Map<String, String> properties) {
-    // todo: initialize the catalogOps
-    catalogOps().initialize(properties);
+    backendOps().initialize(properties);
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
index 3f21148b1..9b03518a6 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
@@ -18,10 +18,19 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
 
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.URI;
+import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
 import org.apache.gravitino.catalog.lakehouse.hudi.ops.HudiCatalogBackendOps;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
@@ -29,28 +38,75 @@ import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.hive.CachedClientPool;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
 
 public class HudiHMSBackendOps implements HudiCatalogBackendOps {
 
+  // Mapping from Gravitino config to Hive config
+  private static final Map<String, String> CONFIG_CONVERTER =
+      ImmutableMap.of(URI, HiveConf.ConfVars.METASTOREURIS.varname);
+
+  private static final String HUDI_PACKAGE_PREFIX = "org.apache.hudi";
+
+  @VisibleForTesting CachedClientPool clientPool;
+
   @Override
   public void initialize(Map<String, String> properties) {
-    // todo: initialize the catalogOps
+    this.clientPool = new CachedClientPool(buildHiveConf(properties), 
properties);
   }
 
   @Override
-  public HudiHMSSchema loadSchema(NameIdentifier schemaIdent) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public HudiSchema loadSchema(NameIdentifier schemaIdent) throws 
NoSuchSchemaException {
+    try {
+      Database database = clientPool.run(client -> 
client.getDatabase(schemaIdent.name()));
+      return HudiHMSSchema.builder().withBackendSchema(database).build();
+
+    } catch (NoSuchObjectException | UnknownDBException e) {
+      throw new NoSuchSchemaException(
+          e, "Hudi schema (database) does not exist: %s in Hive Metastore", 
schemaIdent.name());
+
+    } catch (TException e) {
+      throw new RuntimeException(
+          "Failed to load Hudi schema (database) " + schemaIdent.name() + " 
from Hive Metastore",
+          e);
+
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    try {
+      return clientPool.run(
+          c ->
+              c.getAllDatabases().stream()
+                  .map(db -> NameIdentifier.of(namespace, db))
+                  .toArray(NameIdentifier[]::new));
+
+    } catch (TException e) {
+      throw new RuntimeException(
+          "Failed to list all schemas (database) under namespace : "
+              + namespace
+              + " in Hive Metastore",
+          e);
+
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -73,12 +129,58 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
 
   @Override
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
+    if (!schemaExists(schemaIdent)) {
+      throw new NoSuchSchemaException("Schema (database) does not exist %s", 
namespace);
+    }
+
+    try {
+      return clientPool.run(
+          c -> {
+            List<String> allTables = c.getAllTables(schemaIdent.name());
+            return c.getTableObjectsByName(schemaIdent.name(), 
allTables).stream()
+                .filter(this::checkHudiTable)
+                .map(t -> NameIdentifier.of(namespace, t.getTableName()))
+                .toArray(NameIdentifier[]::new);
+          });
+
+    } catch (UnknownDBException e) {
+      throw new NoSuchSchemaException(
+          "Schema (database) does not exist %s in Hive Metastore", namespace);
+
+    } catch (TException e) {
+      throw new RuntimeException(
+          "Failed to list all tables under the namespace : " + namespace + " 
in Hive Metastore", e);
+
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
-  public HudiHMSTable loadTable(NameIdentifier ident) throws 
NoSuchTableException {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public HudiTable loadTable(NameIdentifier tableIdent) throws 
NoSuchTableException {
+    NameIdentifier schemaIdent = 
NameIdentifier.of(tableIdent.namespace().levels());
+
+    try {
+      Table table =
+          clientPool.run(client -> client.getTable(schemaIdent.name(), 
tableIdent.name()));
+      if (!checkHudiTable(table)) {
+        throw new NoSuchTableException(
+            "Table %s is not a Hudi table in Hive Metastore", 
tableIdent.name());
+      }
+      return HudiHMSTable.builder().withBackendTable(table).build();
+
+    } catch (NoSuchObjectException e) {
+      throw new NoSuchTableException(
+          e, "Hudi table does not exist: %s in Hive Metastore", 
tableIdent.name());
+
+    } catch (TException e) {
+      throw new RuntimeException(
+          "Failed to load Hudi table " + tableIdent.name() + " from Hive 
metastore", e);
+
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -108,6 +210,38 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
 
   @Override
   public void close() {
-    // todo: close the HMS connection
+    if (clientPool != null) {
+      clientPool.close();
+      clientPool = null;
+    }
+  }
+
+  private boolean checkHudiTable(Table table) {
+    // here uses the input format to filter out non-Hudi tables, the COW table
+    // uses `org.apache.hudi.hadoop.HoodieParquetInputFormat` and MOR table
+    // uses `org.apache.hudi.hadoop.HoodieParquetRealtimeInputFormat`, to
+    // simplify the logic, we just check the prefix of the input format
+    return table.getSd().getInputFormat() != null
+        && table.getSd().getInputFormat().startsWith(HUDI_PACKAGE_PREFIX);
+  }
+
+  private HiveConf buildHiveConf(Map<String, String> properties) {
+    Configuration hadoopConf = new Configuration();
+
+    Map<String, String> byPassConfigs = Maps.newHashMap();
+    Map<String, String> convertedConfigs = Maps.newHashMap();
+    properties.forEach(
+        (key, value) -> {
+          if (key.startsWith(CATALOG_BYPASS_PREFIX)) {
+            byPassConfigs.put(key.substring(CATALOG_BYPASS_PREFIX.length()), 
value);
+          } else if (CONFIG_CONVERTER.containsKey(key)) {
+            convertedConfigs.put(CONFIG_CONVERTER.get(key), value);
+          }
+        });
+    byPassConfigs.forEach(hadoopConf::set);
+    // Gravitino conf has higher priority than bypass conf
+    convertedConfigs.forEach(hadoopConf::set);
+
+    return new HiveConf(hadoopConf, HudiHMSBackendOps.class);
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
index 636d76ba2..734261f5e 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
@@ -18,7 +18,12 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
 
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiSchemaPropertiesMetadata.LOCATION;
+
+import com.google.common.collect.Maps;
+import java.util.Optional;
 import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
+import org.apache.gravitino.meta.AuditInfo;
 import org.apache.hadoop.hive.metastore.api.Database;
 
 public class HudiHMSSchema extends HudiSchema<Database> {
@@ -39,7 +44,7 @@ public class HudiHMSSchema extends HudiSchema<Database> {
   public static class Builder extends HudiSchema.Builder<Database> {
 
     @Override
-    protected HudiSchema simpleBuild() {
+    protected HudiHMSSchema simpleBuild() {
       HudiHMSSchema schema = new HudiHMSSchema();
       schema.name = name;
       schema.comment = comment;
@@ -49,8 +54,17 @@ public class HudiHMSSchema extends HudiSchema<Database> {
     }
 
     @Override
-    protected HudiSchema buildFromSchema(Database schema) {
-      // todo: convert HMS database to HudiSchema
+    protected HudiHMSSchema buildFromSchema(Database database) {
+      name = database.getName();
+      comment = database.getDescription();
+
+      properties = Maps.newHashMap(database.getParameters());
+      properties.put(LOCATION, database.getLocationUri());
+
+      AuditInfo.Builder auditInfoBuilder = AuditInfo.builder();
+      
Optional.ofNullable(database.getOwnerName()).ifPresent(auditInfoBuilder::withCreator);
+      auditInfo = auditInfoBuilder.build();
+
       return simpleBuild();
     }
   }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
index 4bd65b54d..e7c10ea1c 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
@@ -18,7 +18,14 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
 
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.COMMENT;
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.INPUT_FORMAT;
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.LOCATION;
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.OUTPUT_FORMAT;
+
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn;
 import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
+import org.apache.gravitino.hive.converter.HiveTableConverter;
 import org.apache.hadoop.hive.metastore.api.Table;
 
 public class HudiHMSTable extends HudiTable<Table> {
@@ -44,6 +51,7 @@ public class HudiHMSTable extends HudiTable<Table> {
       table.columns = columns;
       table.indexes = indexes;
       table.partitioning = partitioning;
+      table.sortOrders = sortOrders;
       table.distribution = distribution;
       table.properties = properties;
       table.auditInfo = auditInfo;
@@ -51,8 +59,26 @@ public class HudiHMSTable extends HudiTable<Table> {
     }
 
     @Override
-    protected HudiTable buildFromTable(Table backendTable) {
-      // todo: convert HMS table to HudiTable
+    protected HudiHMSTable buildFromTable(Table hmsTable) {
+      name = hmsTable.getTableName();
+      comment = hmsTable.getParameters().get(COMMENT);
+      columns = HiveTableConverter.getColumns(hmsTable, HudiColumn.builder());
+      partitioning = HiveTableConverter.getPartitioning(hmsTable);
+
+      // Should always be SortOrders.NONE since Hudi using clustering to sort 
data (see
+      // https://hudi.apache.org/docs/next/clustering/)
+      // but is run as a background table service
+      sortOrders = HiveTableConverter.getSortOrders(hmsTable);
+
+      // Should always be Distributions.NONE since Hudi doesn't support 
distribution
+      distribution = HiveTableConverter.getDistribution(hmsTable);
+      auditInfo = HiveTableConverter.getAuditInfo(hmsTable);
+
+      properties = hmsTable.getParameters();
+      properties.put(LOCATION, hmsTable.getSd().getLocation());
+      properties.put(INPUT_FORMAT, hmsTable.getSd().getInputFormat());
+      properties.put(OUTPUT_FORMAT, hmsTable.getSd().getOutputFormat());
+
       return simpleBuild();
     }
   }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
index 853d71159..c629ee55a 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
@@ -18,7 +18,11 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi.utils;
 
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND;
+
+import java.util.Locale;
 import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackend;
 
@@ -26,9 +30,15 @@ public class CatalogUtils {
   private CatalogUtils() {}
 
   public static HudiCatalogBackend loadHudiCatalogBackend(Map<String, String> 
properties) {
-    // todo: load and initialize the backend based on the properties
-    HudiCatalogBackend hudiHMSBackend = new HudiHMSBackend();
-    hudiHMSBackend.initialize(properties);
-    return hudiHMSBackend;
+    BackendType backendType =
+        
BackendType.valueOf(properties.get(CATALOG_BACKEND).toUpperCase(Locale.ROOT));
+    switch (backendType) {
+      case HMS:
+        HudiCatalogBackend hudiHMSBackend = new HudiHMSBackend();
+        hudiHMSBackend.initialize(properties);
+        return hudiHMSBackend;
+      default:
+        throw new UnsupportedOperationException("Unsupported backend type: " + 
backendType);
+    }
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java
new file mode 100644
index 000000000..d49172fbc
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi;
+
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.CatalogOperations;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestHudiCatalog {
+  @Test
+  public void testOps() throws IOException {
+    try (HudiCatalog catalog = new HudiCatalog()) {
+      IllegalArgumentException exception =
+          Assertions.assertThrows(IllegalArgumentException.class, 
catalog::ops);
+      Assertions.assertEquals(
+          "entity and conf must be set before calling ops()", 
exception.getMessage());
+    }
+
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+    CatalogEntity entity =
+        CatalogEntity.builder()
+            .withId(1L)
+            .withName("catalog")
+            .withNamespace(Namespace.of("metalake"))
+            .withType(HudiCatalog.Type.RELATIONAL)
+            .withProvider("lakehouse-hudi")
+            .withAuditInfo(auditInfo)
+            .build();
+
+    Map<String, String> conf = ImmutableMap.of(CATALOG_BACKEND, "hms");
+    try (HudiCatalog catalog = new 
HudiCatalog().withCatalogConf(conf).withCatalogEntity(entity)) {
+      CatalogOperations ops = catalog.ops();
+      Assertions.assertInstanceOf(HudiCatalogOperations.class, ops);
+    }
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java
new file mode 100644
index 000000000..16595da6a
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi;
+
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackendOps;
+import org.apache.gravitino.catalog.lakehouse.hudi.ops.InMemoryBackendOps;
+import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.connector.PropertiesMetadata;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestHudiCatalogOperations {
+
+  private static final HasPropertyMetadata HUDI_PROPERTIES_METADATA =
+      new HasPropertyMetadata() {
+        @Override
+        public PropertiesMetadata tablePropertiesMetadata() throws 
UnsupportedOperationException {
+          return HudiCatalog.TABLE_PROPERTIES_METADATA;
+        }
+
+        @Override
+        public PropertiesMetadata catalogPropertiesMetadata() throws 
UnsupportedOperationException {
+          return HudiCatalog.CATALOG_PROPERTIES_METADATA;
+        }
+
+        @Override
+        public PropertiesMetadata schemaPropertiesMetadata() throws 
UnsupportedOperationException {
+          return HudiCatalog.SCHEMA_PROPERTIES_METADATA;
+        }
+
+        @Override
+        public PropertiesMetadata filesetPropertiesMetadata() throws 
UnsupportedOperationException {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public PropertiesMetadata topicPropertiesMetadata() throws 
UnsupportedOperationException {
+          throw new UnsupportedOperationException();
+        }
+      };
+
+  @Test
+  public void testInitialize() {
+    HudiCatalogOperations ops = new HudiCatalogOperations();
+    ops.initialize(ImmutableMap.of(CATALOG_BACKEND, "hms"), null, 
HUDI_PROPERTIES_METADATA);
+    Assertions.assertInstanceOf(HudiHMSBackendOps.class, 
ops.hudiCatalogBackendOps);
+    ops.close();
+  }
+
+  @Test
+  public void testTestConnection() throws Exception {
+    try (HudiCatalogOperations ops = new HudiCatalogOperations();
+        InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) {
+      ops.hudiCatalogBackendOps = inMemoryBackendOps;
+
+      Assertions.assertDoesNotThrow(() -> ops.testConnection(null, null, null, 
null, null));
+    }
+  }
+
+  @Test
+  public void testListSchemas() throws Exception {
+    try (HudiCatalogOperations ops = new HudiCatalogOperations();
+        InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) {
+      ops.hudiCatalogBackendOps = inMemoryBackendOps;
+
+      NameIdentifier[] schemas = ops.listSchemas(null);
+      Assertions.assertEquals(0, schemas.length);
+    }
+  }
+
+  @Test
+  public void testLoadSchema() throws Exception {
+    try (HudiCatalogOperations ops = new HudiCatalogOperations();
+        InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) {
+      ops.hudiCatalogBackendOps = inMemoryBackendOps;
+
+      Assertions.assertThrows(
+          NoSuchSchemaException.class,
+          () -> ops.loadSchema(NameIdentifier.of("metalake", "catalog", 
"schema")));
+    }
+  }
+
+  @Test
+  public void testListTables() throws Exception {
+    try (HudiCatalogOperations ops = new HudiCatalogOperations();
+        InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) {
+      ops.hudiCatalogBackendOps = inMemoryBackendOps;
+
+      NameIdentifier[] tables = ops.listTables(null);
+      Assertions.assertEquals(0, tables.length);
+    }
+  }
+
+  @Test
+  public void testLoadTable() throws Exception {
+    try (HudiCatalogOperations ops = new HudiCatalogOperations();
+        InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) {
+      ops.hudiCatalogBackendOps = inMemoryBackendOps;
+
+      Assertions.assertThrows(
+          NoSuchTableException.class,
+          () -> ops.loadTable(NameIdentifier.of("metalake", "catalog", 
"table")));
+    }
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java
similarity index 62%
copy from 
catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
copy to 
catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java
index 636d76ba2..95d5dbdfa 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java
@@ -16,31 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
+package org.apache.gravitino.catalog.lakehouse.hudi;
 
-import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
-import org.apache.hadoop.hive.metastore.api.Database;
-
-public class HudiHMSSchema extends HudiSchema<Database> {
+public class TestHudiSchema extends HudiSchema<TestHudiSchema> {
 
   public static Builder builder() {
     return new Builder();
   }
 
-  private HudiHMSSchema() {
-    super();
-  }
-
   @Override
-  public Database fromHudiSchema() {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public TestHudiSchema fromHudiSchema() {
+    return this;
   }
 
-  public static class Builder extends HudiSchema.Builder<Database> {
+  public static class Builder extends HudiSchema.Builder<TestHudiSchema> {
 
     @Override
-    protected HudiSchema simpleBuild() {
-      HudiHMSSchema schema = new HudiHMSSchema();
+    protected TestHudiSchema simpleBuild() {
+      TestHudiSchema schema = new TestHudiSchema();
       schema.name = name;
       schema.comment = comment;
       schema.properties = properties;
@@ -49,8 +42,7 @@ public class HudiHMSSchema extends HudiSchema<Database> {
     }
 
     @Override
-    protected HudiSchema buildFromSchema(Database schema) {
-      // todo: convert HMS database to HudiSchema
+    protected HudiSchema<TestHudiSchema> buildFromSchema(TestHudiSchema 
schema) {
       return simpleBuild();
     }
   }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java
similarity index 58%
copy from 
catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
copy to 
catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java
index 4bd65b54d..9914bff9d 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java
@@ -16,43 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
+package org.apache.gravitino.catalog.lakehouse.hudi;
 
-import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-public class HudiHMSTable extends HudiTable<Table> {
+public class TestHudiTable extends HudiTable<TestHudiTable> {
   public static Builder builder() {
     return new Builder();
   }
 
-  private HudiHMSTable() {
-    super();
-  }
-
   @Override
-  public Table fromHudiTable() {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public TestHudiTable fromHudiTable() {
+    return this;
   }
 
-  public static class Builder extends HudiTable.Builder<Table> {
+  public static class Builder extends HudiTable.Builder<TestHudiTable> {
     @Override
-    protected HudiHMSTable simpleBuild() {
-      HudiHMSTable table = new HudiHMSTable();
+    protected TestHudiTable simpleBuild() {
+      TestHudiTable table = new TestHudiTable();
       table.name = name;
       table.comment = comment;
-      table.columns = columns;
-      table.indexes = indexes;
-      table.partitioning = partitioning;
-      table.distribution = distribution;
       table.properties = properties;
       table.auditInfo = auditInfo;
       return table;
     }
 
     @Override
-    protected HudiTable buildFromTable(Table backendTable) {
-      // todo: convert HMS table to HudiTable
+    protected HudiTable<TestHudiTable> buildFromTable(TestHudiTable table) {
       return simpleBuild();
     }
   }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java
similarity index 64%
copy from 
catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
copy to 
catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java
index dfc6228f2..a921fd8c2 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java
@@ -20,24 +20,18 @@ package 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
 
 import static 
org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType.HMS;
 
-import java.util.Map;
-import org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType;
+import com.google.common.collect.ImmutableMap;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend;
-import org.apache.gravitino.catalog.lakehouse.hudi.ops.HudiCatalogBackendOps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-public class HudiHMSBackend extends HudiCatalogBackend {
+public class TestHudiHMSBackend {
+  @Test
+  public void testInitialize() {
+    HudiCatalogBackend backend = new HudiHMSBackend();
+    backend.initialize(ImmutableMap.of());
 
-  public HudiHMSBackend() {
-    this(HMS, new HudiHMSBackendOps());
-  }
-
-  private HudiHMSBackend(BackendType backendType, HudiCatalogBackendOps 
catalogOps) {
-    super(backendType, catalogOps);
-  }
-
-  @Override
-  public void initialize(Map<String, String> properties) {
-    // todo: initialize the catalogOps
-    catalogOps().initialize(properties);
+    Assertions.assertEquals(HMS, backend.type());
+    Assertions.assertInstanceOf(HudiHMSBackendOps.class, backend.backendOps());
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java
new file mode 100644
index 000000000..d1e531e84
--- /dev/null
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
+
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.URI;
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiSchemaPropertiesMetadata.LOCATION;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.hive.hms.MiniHiveMetastoreService;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.spark.sql.SparkSession;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TestHudiHMSBackendOps extends MiniHiveMetastoreService {
+
+  private static final HudiHMSBackendOps ops = new HudiHMSBackendOps();
+  private static final String METALAKE_NAME = "metalake";
+  private static final String CATALOG_NAME = "catalog";
+  private static final String HIVE_TABLE_NAME = "hive_table";
+  private static final String HUDI_TABLE_NAME = "hudi_table";
+
+  @BeforeAll
+  public static void prepare() throws TException {
+    Map<String, String> props = Maps.newHashMap();
+    props.put(URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+    ops.initialize(props);
+
+    // create a hive table
+    Table table = new Table();
+    table.setDbName(DB_NAME);
+    table.setTableName(HIVE_TABLE_NAME);
+    StorageDescriptor strgDesc = new StorageDescriptor();
+    strgDesc.setCols(Lists.newArrayList(new FieldSchema("col1", "string", 
"description")));
+    strgDesc.setSerdeInfo(new SerDeInfo());
+    table.setSd(strgDesc);
+    metastoreClient.createTable(table);
+
+    // use Spark to create a hudi table
+    SparkSession sparkSession =
+        SparkSession.builder()
+            .master("local[1]")
+            .appName("Hudi Catalog integration test")
+            .config("hive.metastore.uris", 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
+            .config(
+                "spark.sql.catalog.spark_catalog",
+                "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
+            .config("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar")
+            .config("dfs.replication", "1")
+            .enableHiveSupport()
+            .getOrCreate();
+
+    // create a hudi table
+    sparkSession.sql(
+        String.format("CREATE TABLE %s.%s (ts BIGINT) USING HUDI", DB_NAME, 
HUDI_TABLE_NAME));
+  }
+
+  @AfterAll
+  public static void cleanup() throws TException {
+    ops.close();
+  }
+
+  @Test
+  public void testInitialize() {
+    try (HudiHMSBackendOps ops = new HudiHMSBackendOps()) {
+      ops.initialize(ImmutableMap.of());
+      Assertions.assertNotNull(ops.clientPool);
+    }
+  }
+
+  @Test
+  public void testLoadSchema() {
+    HudiSchema hudiSchema = ops.loadSchema(NameIdentifier.of(METALAKE_NAME, 
CATALOG_NAME, DB_NAME));
+
+    Assertions.assertEquals(DB_NAME, hudiSchema.name());
+    Assertions.assertEquals("description", hudiSchema.comment());
+    Assertions.assertNotNull(hudiSchema.properties().get(LOCATION));
+  }
+
+  @Test
+  public void testListSchemas() {
+    Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME);
+    NameIdentifier[] schemas = ops.listSchemas(namespace);
+
+    Assertions.assertTrue(schemas.length > 0);
+    Assertions.assertTrue(Arrays.stream(schemas).anyMatch(schema -> 
schema.name().equals(DB_NAME)));
+  }
+
+  @Test
+  public void testListTables() {
+    Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME);
+    NameIdentifier[] tables = ops.listTables(namespace);
+
+    // all hive tables are filtered out
+    Assertions.assertEquals(1, tables.length);
+    Assertions.assertEquals(HUDI_TABLE_NAME, tables[0].name());
+  }
+
+  @Test
+  public void testLoadTable() {
+    Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME);
+    Exception exception =
+        Assertions.assertThrows(
+            NoSuchTableException.class,
+            () -> ops.loadTable(NameIdentifier.of(namespace, 
HIVE_TABLE_NAME)));
+    Assertions.assertEquals(
+        "Table hive_table is not a Hudi table in Hive Metastore", 
exception.getMessage());
+
+    HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, 
HUDI_TABLE_NAME));
+    Assertions.assertEquals(HUDI_TABLE_NAME, hudiTable.name());
+    Assertions.assertNull(hudiTable.comment());
+    Assertions.assertNotNull(hudiTable.properties().get(LOCATION));
+
+    Column[] columns = hudiTable.columns();
+    Assertions.assertEquals(6, columns.length);
+
+    Assertions.assertEquals(
+        HudiColumn.builder()
+            .withName("_hoodie_commit_time")
+            .withType(Types.StringType.get())
+            .build(),
+        columns[0]);
+    Assertions.assertEquals(
+        HudiColumn.builder()
+            .withName("_hoodie_commit_seqno")
+            .withType(Types.StringType.get())
+            .build(),
+        columns[1]);
+    Assertions.assertEquals(
+        HudiColumn.builder()
+            .withName("_hoodie_record_key")
+            .withType(Types.StringType.get())
+            .build(),
+        columns[2]);
+    Assertions.assertEquals(
+        HudiColumn.builder()
+            .withName("_hoodie_partition_path")
+            .withType(Types.StringType.get())
+            .build(),
+        columns[3]);
+    Assertions.assertEquals(
+        
HudiColumn.builder().withName("_hoodie_file_name").withType(Types.StringType.get()).build(),
+        columns[4]);
+    Assertions.assertEquals(
+        
HudiColumn.builder().withName("ts").withType(Types.LongType.get()).build(), 
columns[5]);
+  }
+}
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java
similarity index 55%
copy from 
catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
copy to 
catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java
index 3f21148b1..eeb0ac295 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java
@@ -16,13 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms;
+package org.apache.gravitino.catalog.lakehouse.hudi.ops;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.SchemaChange;
-import org.apache.gravitino.catalog.lakehouse.hudi.ops.HudiCatalogBackendOps;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
+import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
+import org.apache.gravitino.catalog.lakehouse.hudi.TestHudiSchema;
+import org.apache.gravitino.catalog.lakehouse.hudi.TestHudiTable;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -30,38 +35,61 @@ import 
org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
 
-public class HudiHMSBackendOps implements HudiCatalogBackendOps {
+public class InMemoryBackendOps implements HudiCatalogBackendOps {
+  private final ConcurrentMap<NameIdentifier, TestHudiSchema> schemas;
+  private final ConcurrentMap<NameIdentifier, TestHudiTable> tables;
+
+  public InMemoryBackendOps() {
+    this.schemas = new ConcurrentHashMap<>();
+    this.tables = new ConcurrentHashMap<>();
+  }
 
   @Override
   public void initialize(Map<String, String> properties) {
-    // todo: initialize the catalogOps
+    // Do nothing
   }
 
   @Override
-  public HudiHMSSchema loadSchema(NameIdentifier schemaIdent) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public HudiSchema loadSchema(NameIdentifier schemaIdent) throws 
NoSuchSchemaException {
+    TestHudiSchema schema = schemas.get(schemaIdent);
+    if (schema == null) {
+      throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent);
+    }
+    return schema;
   }
 
   @Override
   public NameIdentifier[] listSchemas(Namespace namespace) throws 
NoSuchCatalogException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return schemas.keySet().toArray(new NameIdentifier[0]);
   }
 
   @Override
-  public HudiHMSSchema createSchema(
+  public HudiSchema createSchema(
       NameIdentifier ident, String comment, Map<String, String> properties)
       throws NoSuchCatalogException, SchemaAlreadyExistsException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    if (schemas.containsKey(ident)) {
+      throw new SchemaAlreadyExistsException("Schema %s already exists", 
ident);
+    }
+
+    HudiSchema<TestHudiSchema> schema =
+        TestHudiSchema.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withProperties(properties)
+            .build();
+    schemas.put(ident, schema.fromHudiSchema());
+    return schema;
   }
 
   @Override
-  public HudiHMSSchema alterSchema(NameIdentifier ident, SchemaChange... 
changes)
+  public HudiSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
       throws NoSuchSchemaException {
     throw new UnsupportedOperationException("Not implemented yet");
   }
@@ -73,16 +101,20 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
 
   @Override
   public NameIdentifier[] listTables(Namespace namespace) throws 
NoSuchSchemaException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    return tables.keySet().toArray(new NameIdentifier[0]);
   }
 
   @Override
-  public HudiHMSTable loadTable(NameIdentifier ident) throws 
NoSuchTableException {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
+    TestHudiTable table = tables.get(ident);
+    if (table == null) {
+      throw new NoSuchTableException("Table %s does not exist", ident);
+    }
+    return table;
   }
 
   @Override
-  public HudiHMSTable createTable(
+  public Table createTable(
       NameIdentifier ident,
       Column[] columns,
       String comment,
@@ -92,11 +124,22 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    if (tables.containsKey(ident)) {
+      throw new TableAlreadyExistsException("Table %s already exists", ident);
+    }
+
+    HudiTable<TestHudiTable> table =
+        TestHudiTable.builder()
+            .withName(ident.name())
+            .withComment(comment)
+            .withProperties(properties)
+            .build();
+    tables.put(ident, table.fromHudiTable());
+    return table;
   }
 
   @Override
-  public HudiHMSTable alterTable(NameIdentifier ident, TableChange... changes)
+  public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
     throw new UnsupportedOperationException("Not implemented yet");
   }
@@ -107,7 +150,8 @@ public class HudiHMSBackendOps implements 
HudiCatalogBackendOps {
   }
 
   @Override
-  public void close() {
-    // todo: close the HMS connection
+  public void close() throws Exception {
+    schemas.clear();
+    tables.clear();
   }
 }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java
similarity index 60%
copy from 
catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
copy to 
catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java
index 853d71159..5f306d7bf 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java
@@ -18,17 +18,21 @@
  */
 package org.apache.gravitino.catalog.lakehouse.hudi.utils;
 
-import java.util.Map;
+import static 
org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend;
 import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackend;
+import 
org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackendOps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-public class CatalogUtils {
-  private CatalogUtils() {}
-
-  public static HudiCatalogBackend loadHudiCatalogBackend(Map<String, String> 
properties) {
-    // todo: load and initialize the backend based on the properties
-    HudiCatalogBackend hudiHMSBackend = new HudiHMSBackend();
-    hudiHMSBackend.initialize(properties);
-    return hudiHMSBackend;
+public class TestCatalogUtils {
+  @Test
+  public void testLoadHudiCatalogBackend() {
+    HudiCatalogBackend catalogBackend =
+        CatalogUtils.loadHudiCatalogBackend(ImmutableMap.of(CATALOG_BACKEND, 
"hms"));
+    Assertions.assertInstanceOf(HudiHMSBackend.class, catalogBackend);
+    Assertions.assertInstanceOf(HudiHMSBackendOps.class, 
catalogBackend.backendOps());
   }
 }
diff --git 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
index 757d8df28..03cb233d4 100644
--- 
a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
+++ 
b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java
@@ -63,7 +63,7 @@ public class HiveTableConverter {
   }
 
   public static SortOrder[] getSortOrders(Table table) {
-    SortOrder[] sortOrders = new SortOrder[0];
+    SortOrder[] sortOrders = SortOrders.NONE;
     StorageDescriptor sd = table.getSd();
     if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
       sortOrders =

Reply via email to