This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new c8ce4df  [Imrove] Improve api module (#10)
c8ce4df is described below

commit c8ce4df46dbc947a8e6b797f660cacd66aac4ff1
Author: s7monk <[email protected]>
AuthorDate: Thu Jul 27 15:00:58 2023 +0800

    [Imrove] Improve api module (#10)
---
 .../paimon/web/api/catalog/CatalogCreator.java     |   8 +-
 .../paimon/web/api/common/CatalogEntity.java       | 110 +++++
 .../paimon/web/api/common/CatalogProperties.java   |   4 +-
 .../{CatalogProperties.java => MetastoreType.java} |  18 +-
 .../{CatalogProperties.java => OperatorKind.java}  |  23 +-
 .../{CatalogProperties.java => WriteMode.java}     |  18 +-
 .../paimon/web/api/table/AlterTableEntity.java     | 137 ++++++
 .../ConsumerTableMetadata.java}                    |  31 +-
 .../paimon/web/api/table/FileTableMetadata.java    | 239 ++++++++++
 .../web/api/table/ManifestTableMetadata.java       | 105 +++++
 .../OptionTableMetadata.java}                      |  31 +-
 .../paimon/web/api/table/SchemaTableMetadata.java  | 120 +++++
 .../web/api/table/SnapshotTableMetadata.java       | 197 ++++++++
 .../apache/paimon/web/api/table/TableManager.java  | 525 +++++++++++++++++++++
 .../paimon/web/api/table/TagTableMetadata.java     | 108 +++++
 .../web/common/annotation/VisibleForTesting.java   |  28 +-
 .../web/common/utils/ParameterValidationUtil.java  |  22 +-
 17 files changed, 1662 insertions(+), 62 deletions(-)

diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
index 22514e6..143f854 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.web.api.common.CatalogProperties;
+import org.apache.paimon.web.api.common.MetastoreType;
 
 /** paimon catalog creator. */
 public class CatalogCreator {
@@ -33,11 +34,10 @@ public class CatalogCreator {
         return CatalogFactory.createCatalog(context);
     }
 
-    public static Catalog createHiveCatalog(
-            String warehouse, String metastore, String uri, String 
hiveConfDir) {
+    public static Catalog createHiveCatalog(String warehouse, String uri, 
String hiveConfDir) {
         Options options = new Options();
-        options.set(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
-        options.set(CatalogProperties.METASTORE_TYPE, metastore);
+        options.set(CatalogProperties.WAREHOUSE, warehouse);
+        options.set(CatalogProperties.METASTORE, 
MetastoreType.HIVE.toString());
         options.set(CatalogProperties.URI, uri);
         options.set(CatalogProperties.HIVE_CONF_DIR, hiveConfDir);
         CatalogContext context = CatalogContext.create(options);
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
new file mode 100644
index 0000000..abaf5dd
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.common;
+
+import javax.annotation.Nullable;
+
+/** catalog entity. */
+public class CatalogEntity {
+
+    private final Long catalogId;
+
+    private final String warehouse;
+
+    private final String metastoreType;
+
+    private final String uri;
+
+    private final String hiveConfDir;
+
+    public CatalogEntity(
+            Long catalogId,
+            String warehouse,
+            String metastoreType,
+            String uri,
+            String hiveConfDir) {
+        this.catalogId = catalogId;
+        this.warehouse = warehouse;
+        this.metastoreType = metastoreType;
+        this.uri = uri;
+        this.hiveConfDir = hiveConfDir;
+    }
+
+    public Long getCatalogId() {
+        return catalogId;
+    }
+
+    public String getWarehouse() {
+        return warehouse;
+    }
+
+    public String getMetastoreType() {
+        return metastoreType;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public String getHiveConfDir() {
+        return hiveConfDir;
+    }
+
+    public static CatalogEntity.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for CatalogEntity. */
+    public static final class Builder {
+        private Long catalogId;
+        private String warehouse;
+        private String metastoreType;
+        @Nullable private String uri;
+        @Nullable private String hiveConfDir;
+
+        public Builder catalogId(Long catalogId) {
+            this.catalogId = catalogId;
+            return this;
+        }
+
+        public Builder warehouse(String warehouse) {
+            this.warehouse = warehouse;
+            return this;
+        }
+
+        public Builder metastoreType(String metastoreType) {
+            this.metastoreType = metastoreType;
+            return this;
+        }
+
+        public Builder uri(String uri) {
+            this.uri = uri;
+            return this;
+        }
+
+        public Builder hiveConfDir(String hiveConfDir) {
+            this.hiveConfDir = hiveConfDir;
+            return this;
+        }
+
+        public CatalogEntity build() {
+            return new CatalogEntity(catalogId, warehouse, metastoreType, uri, 
hiveConfDir);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
index c8930fd..7360df3 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
@@ -21,9 +21,9 @@ package org.apache.paimon.web.api.common;
 /** paimon catalog properties. */
 public class CatalogProperties {
 
-    public static final String METASTORE_TYPE = "metastore";
+    public static final String METASTORE = "metastore";
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    public static final String WAREHOUSE = "warehouse";
 
     public static final String URI = "uri";
 
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
similarity index 73%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
index c8930fd..6d2d385 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
@@ -18,14 +18,18 @@
 
 package org.apache.paimon.web.api.common;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of catalog metastore type. */
+public enum MetastoreType {
+    FILE_SYSTEM("filesystem"),
+    HIVE("hive");
 
-    public static final String METASTORE_TYPE = "metastore";
+    private final String value;
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    MetastoreType(String value) {
+        this.value = value;
+    }
 
-    public static final String URI = "uri";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public String toString() {
+        return value;
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
similarity index 64%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
index c8930fd..d549ed6 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
@@ -18,14 +18,23 @@
 
 package org.apache.paimon.web.api.common;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of operator kind. */
+public enum OperatorKind {
+    ADD_COLUMN("add"),
+    RENAME_COLUMN("rename"),
+    DROP_COLUMN("drop"),
+    UPDATE_COLUMN_COMMENT("update_comment"),
+    UPDATE_COLUMN_TYPE("update_type"),
+    UPDATE_COLUMN_POSITION("update_position"),
+    UPDATE_COLUMN_NULLABILITY("update_nullability");
 
-    public static final String METASTORE_TYPE = "metastore";
+    private final String value;
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    OperatorKind(String value) {
+        this.value = value;
+    }
 
-    public static final String URI = "uri";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public String value() {
+        return value;
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
similarity index 73%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
index c8930fd..7085fc5 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
@@ -18,14 +18,18 @@
 
 package org.apache.paimon.web.api.common;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of write mode. */
+public enum WriteMode {
+    STREAM("stream"),
+    BATCH("batch");
 
-    public static final String METASTORE_TYPE = "metastore";
+    private final String value;
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    WriteMode(String value) {
+        this.value = value;
+    }
 
-    public static final String URI = "uri";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public String getValue() {
+        return value;
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
new file mode 100644
index 0000000..fea0d6a
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.web.api.common.OperatorKind;
+
+import javax.annotation.Nullable;
+
+/** alter table entity. */
+public class AlterTableEntity {
+
+    private final String columnName;
+    private final DataType type;
+    private final String comment;
+    private final String newColumn;
+    private final boolean isNullable;
+    private final SchemaChange.Move move;
+    private final OperatorKind kind;
+
+    public AlterTableEntity(
+            String columnName,
+            DataType type,
+            String comment,
+            String newColumn,
+            boolean isNullable,
+            SchemaChange.Move move,
+            OperatorKind kind) {
+        this.columnName = columnName;
+        this.type = type;
+        this.comment = comment;
+        this.newColumn = newColumn;
+        this.isNullable = isNullable;
+        this.move = move;
+        this.kind = kind;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public DataType getType() {
+        return type;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public String getNewColumn() {
+        return newColumn;
+    }
+
+    public boolean isNullable() {
+        return isNullable;
+    }
+
+    public SchemaChange.Move getMove() {
+        return move;
+    }
+
+    public OperatorKind getKind() {
+        return kind;
+    }
+
+    public static AlterTableEntity.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for AlterTableEntity. */
+    public static class Builder {
+        private String columnName;
+        @Nullable private DataType type;
+        @Nullable private String comment;
+        @Nullable private String newColumn;
+        @Nullable private boolean isNullable;
+        @Nullable private SchemaChange.Move move;
+        private OperatorKind kind;
+
+        public Builder columnName(String columnName) {
+            this.columnName = columnName;
+            return this;
+        }
+
+        public Builder type(DataType type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        public Builder newColumn(String newColumn) {
+            this.newColumn = newColumn;
+            return this;
+        }
+
+        public Builder nullable(boolean isNullable) {
+            this.isNullable = isNullable;
+            return this;
+        }
+
+        public Builder move(SchemaChange.Move move) {
+            this.move = move;
+            return this;
+        }
+
+        public Builder kind(OperatorKind kind) {
+            this.kind = kind;
+            return this;
+        }
+
+        public AlterTableEntity build() {
+            return new AlterTableEntity(
+                    columnName, type, comment, newColumn, isNullable, move, 
kind);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
similarity index 54%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
index c8930fd..12e52dd 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
@@ -16,16 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.api.table;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** file table metadata. */
+public class ConsumerTableMetadata {
 
-    public static final String METASTORE_TYPE = "metastore";
+    private String consumerId;
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    private Long nextSnapshotId;
 
-    public static final String URI = "uri";
+    public ConsumerTableMetadata(String consumerId, Long nextSnapshotId) {
+        this.consumerId = consumerId;
+        this.nextSnapshotId = nextSnapshotId;
+    }
 
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(String consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    public Long getNextSnapshotId() {
+        return nextSnapshotId;
+    }
+
+    public void setNextSnapshotId(Long nextSnapshotId) {
+        this.nextSnapshotId = nextSnapshotId;
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
new file mode 100644
index 0000000..3c0776e
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** file table metadata. */
+public class FileTableMetadata {
+
+    private final String partition;
+    private final Integer bucket;
+    private final String filePath;
+    private final String fileFormat;
+    private final Long schemaId;
+    private final Integer level;
+    private final Long recordCount;
+    private final Long fileSizeInBytes;
+    private final String minKey;
+    private final String maxKey;
+    private final String nullValueCounts;
+    private final String minValueStats;
+    private final String maxValueStats;
+    private final LocalDateTime creationTime;
+
+    public FileTableMetadata(
+            String partition,
+            Integer bucket,
+            String filePath,
+            String fileFormat,
+            Long schemaId,
+            Integer level,
+            Long recordCount,
+            Long fileSizeInBytes,
+            String minKey,
+            String maxKey,
+            String nullValueCounts,
+            String minValueStats,
+            String maxValueStats,
+            LocalDateTime creationTime) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.filePath = filePath;
+        this.fileFormat = fileFormat;
+        this.schemaId = schemaId;
+        this.level = level;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+        this.minKey = minKey;
+        this.maxKey = maxKey;
+        this.nullValueCounts = nullValueCounts;
+        this.minValueStats = minValueStats;
+        this.maxValueStats = maxValueStats;
+        this.creationTime = creationTime;
+    }
+
+    public String getPartition() {
+        return partition;
+    }
+
+    public Integer getBucket() {
+        return bucket;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public String getFileFormat() {
+        return fileFormat;
+    }
+
+    public Long getSchemaId() {
+        return schemaId;
+    }
+
+    public Integer getLevel() {
+        return level;
+    }
+
+    public Long getRecordCount() {
+        return recordCount;
+    }
+
+    public Long getFileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    public String getMinKey() {
+        return minKey;
+    }
+
+    public String getMaxKey() {
+        return maxKey;
+    }
+
+    public String getNullValueCounts() {
+        return nullValueCounts;
+    }
+
+    public String getMinValueStats() {
+        return minValueStats;
+    }
+
+    public String getMaxValueStats() {
+        return maxValueStats;
+    }
+
+    public LocalDateTime getCreationTime() {
+        return creationTime;
+    }
+
+    public static FileTableMetadata.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for FileTableMetadata. */
+    public static final class Builder {
+        @Nullable private String partition;
+        private Integer bucket;
+        private String filePath;
+        private String fileFormat;
+        private Long schemaId;
+        private Integer level;
+        private Long recordCount;
+        private Long fileSizeInBytes;
+        private String minKey;
+        private String maxKey;
+        private String nullValueCounts;
+        @Nullable private String minValueStats;
+        @Nullable private String maxValueStats;
+        private LocalDateTime creationTime;
+
+        public Builder partition(String partition) {
+            this.partition = partition;
+            return this;
+        }
+
+        public Builder bucket(Integer bucket) {
+            this.bucket = bucket;
+            return this;
+        }
+
+        public Builder filePath(String filePath) {
+            this.filePath = filePath;
+            return this;
+        }
+
+        public Builder fileFormat(String fileFormat) {
+            this.fileFormat = fileFormat;
+            return this;
+        }
+
+        public Builder schemaId(Long schemaId) {
+            this.schemaId = schemaId;
+            return this;
+        }
+
+        public Builder level(Integer level) {
+            this.level = level;
+            return this;
+        }
+
+        public Builder recordCount(Long recordCount) {
+            this.recordCount = recordCount;
+            return this;
+        }
+
+        public Builder fileSizeInBytes(Long fileSizeInBytes) {
+            this.fileSizeInBytes = fileSizeInBytes;
+            return this;
+        }
+
+        public Builder minKey(String minKey) {
+            this.minKey = minKey;
+            return this;
+        }
+
+        public Builder maxKey(String maxKey) {
+            this.maxKey = maxKey;
+            return this;
+        }
+
+        public Builder nullValueCounts(String nullValueCounts) {
+            this.nullValueCounts = nullValueCounts;
+            return this;
+        }
+
+        public Builder minValueStats(String minValueStats) {
+            this.minValueStats = minValueStats;
+            return this;
+        }
+
+        public Builder maxValueStats(String maxValueStats) {
+            this.maxValueStats = maxValueStats;
+            return this;
+        }
+
+        public Builder creationTime(LocalDateTime creationTime) {
+            this.creationTime = creationTime;
+            return this;
+        }
+
+        public FileTableMetadata build() {
+            return new FileTableMetadata(
+                    partition,
+                    bucket,
+                    filePath,
+                    fileFormat,
+                    schemaId,
+                    level,
+                    recordCount,
+                    fileSizeInBytes,
+                    minKey,
+                    maxKey,
+                    nullValueCounts,
+                    minValueStats,
+                    maxValueStats,
+                    creationTime);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
new file mode 100644
index 0000000..574899c
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+/** manifest table metadata. */
+public class ManifestTableMetadata {
+
+    private final String fileName;
+    private final Long fileSize;
+    private final Long numAddedFiles;
+    private final Long numDeletedFiles;
+    private final Long schemaId;
+
+    public ManifestTableMetadata(
+            String fileName,
+            Long fileSize,
+            Long numAddedFiles,
+            Long numDeletedFiles,
+            Long schemaId) {
+        this.fileName = fileName;
+        this.fileSize = fileSize;
+        this.numAddedFiles = numAddedFiles;
+        this.numDeletedFiles = numDeletedFiles;
+        this.schemaId = schemaId;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public Long getFileSize() {
+        return fileSize;
+    }
+
+    public Long getNumAddedFiles() {
+        return numAddedFiles;
+    }
+
+    public Long getNumDeletedFiles() {
+        return numDeletedFiles;
+    }
+
+    public Long getSchemaId() {
+        return schemaId;
+    }
+
+    public static ManifestTableMetadata.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for ManifestTableMetadata. */
+    public static final class Builder {
+        private String fileName;
+        private Long fileSize;
+        private Long numAddedFiles;
+        private Long numDeletedFiles;
+        private Long schemaId;
+
+        public Builder fileName(String fileName) {
+            this.fileName = fileName;
+            return this;
+        }
+
+        public Builder fileSize(Long fileSize) {
+            this.fileSize = fileSize;
+            return this;
+        }
+
+        public Builder numAddedFiles(Long numAddedFiles) {
+            this.numAddedFiles = numAddedFiles;
+            return this;
+        }
+
+        public Builder numDeletedFiles(Long numDeletedFiles) {
+            this.numDeletedFiles = numDeletedFiles;
+            return this;
+        }
+
+        public Builder schemaId(Long schemaId) {
+            this.schemaId = schemaId;
+            return this;
+        }
+
+        public ManifestTableMetadata build() {
+            return new ManifestTableMetadata(
+                    fileName, fileSize, numAddedFiles, numDeletedFiles, 
schemaId);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
similarity index 60%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
index c8930fd..ec259e0 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
@@ -16,16 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.api.table;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** options table metadata. */
+public class OptionTableMetadata {
 
-    public static final String METASTORE_TYPE = "metastore";
+    private String key;
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
+    private String value;
 
-    public static final String URI = "uri";
+    public OptionTableMetadata(String key, String value) {
+        this.key = key;
+        this.value = value;
+    }
 
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
new file mode 100644
index 0000000..e9744e5
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+/** schema table metadata. */
+public class SchemaTableMetadata {
+
+    private final Long schemaId;
+    private final String fields;
+    private final String partitionKeys;
+    private final String primaryKeys;
+    private final String options;
+    private final String comment;
+
+    public SchemaTableMetadata(
+            Long schemaId,
+            String fields,
+            String partitionKeys,
+            String primaryKeys,
+            String options,
+            String comment) {
+        this.schemaId = schemaId;
+        this.fields = fields;
+        this.partitionKeys = partitionKeys;
+        this.primaryKeys = primaryKeys;
+        this.options = options;
+        this.comment = comment;
+    }
+
+    public Long getSchemaId() {
+        return schemaId;
+    }
+
+    public String getFields() {
+        return fields;
+    }
+
+    public String getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public String getPrimaryKeys() {
+        return primaryKeys;
+    }
+
+    public String getOptions() {
+        return options;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public static SchemaTableMetadata.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for SchemaTableMetadata. */
+    public static final class Builder {
+        private Long schemaId;
+        private String fields;
+        private String partitionKeys;
+        private String primaryKeys;
+        private String options;
+        @Nullable private String comment;
+
+        public Builder schemaId(Long schemaId) {
+            this.schemaId = schemaId;
+            return this;
+        }
+
+        public Builder fields(String fields) {
+            this.fields = fields;
+            return this;
+        }
+
+        public Builder partitionKeys(String partitionKeys) {
+            this.partitionKeys = partitionKeys;
+            return this;
+        }
+
+        public Builder primaryKeys(String primaryKeys) {
+            this.primaryKeys = primaryKeys;
+            return this;
+        }
+
+        public Builder options(String options) {
+            this.options = options;
+            return this;
+        }
+
+        public Builder comment(String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        public SchemaTableMetadata build() {
+            return new SchemaTableMetadata(
+                    schemaId, fields, partitionKeys, primaryKeys, options, 
comment);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
new file mode 100644
index 0000000..9bc0bd4
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** snapshot table metadata. */
+public class SnapshotTableMetadata {
+
+    private final Long snapshotId;
+    private final String snapshotPath;
+    private final Long schemaId;
+    private final String commitUser;
+    private final Long commitIdentifier;
+    private final String commitKind;
+    private final LocalDateTime commitTime;
+    private final Long totalRecordCount;
+    private final Long deltaRecordCount;
+    private final Long changelogRecordCount;
+    private final Long watermark;
+
+    public SnapshotTableMetadata(
+            Long snapshotId,
+            String snapshotPath,
+            Long schemaId,
+            String commitUser,
+            Long commitIdentifier,
+            String commitKind,
+            LocalDateTime commitTime,
+            Long totalRecordCount,
+            Long deltaRecordCount,
+            Long changelogRecordCount,
+            Long watermark) {
+        this.snapshotId = snapshotId;
+        this.snapshotPath = snapshotPath;
+        this.schemaId = schemaId;
+        this.commitUser = commitUser;
+        this.commitIdentifier = commitIdentifier;
+        this.commitKind = commitKind;
+        this.commitTime = commitTime;
+        this.totalRecordCount = totalRecordCount;
+        this.deltaRecordCount = deltaRecordCount;
+        this.changelogRecordCount = changelogRecordCount;
+        this.watermark = watermark;
+    }
+
+    public Long getSnapshotId() {
+        return snapshotId;
+    }
+
+    public String getSnapshotPath() {
+        return snapshotPath;
+    }
+
+    public Long getSchemaId() {
+        return schemaId;
+    }
+
+    public String getCommitUser() {
+        return commitUser;
+    }
+
+    public Long getCommitIdentifier() {
+        return commitIdentifier;
+    }
+
+    public String getCommitKind() {
+        return commitKind;
+    }
+
+    public LocalDateTime getCommitTime() {
+        return commitTime;
+    }
+
+    public Long getTotalRecordCount() {
+        return totalRecordCount;
+    }
+
+    public Long getDeltaRecordCount() {
+        return deltaRecordCount;
+    }
+
+    public Long getChangelogRecordCount() {
+        return changelogRecordCount;
+    }
+
+    public Long getWatermark() {
+        return watermark;
+    }
+
+    public static SnapshotTableMetadata.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for SnapshotTableMetadata. */
+    public static final class Builder {
+        private Long snapshotId;
+        private String snapshotPath;
+        private Long schemaId;
+        private String commitUser;
+        private Long commitIdentifier;
+        private String commitKind;
+        private LocalDateTime commitTime;
+        @Nullable private Long totalRecordCount;
+        @Nullable private Long deltaRecordCount;
+        @Nullable private Long changelogRecordCount;
+        @Nullable private Long watermark;
+
+        public Builder snapshotId(Long snapshotId) {
+            this.snapshotId = snapshotId;
+            return this;
+        }
+
+        public Builder snapshotPath(String snapshotPath) {
+            this.snapshotPath = snapshotPath;
+            return this;
+        }
+
+        public Builder schemaId(Long schemaId) {
+            this.schemaId = schemaId;
+            return this;
+        }
+
+        public Builder commitUser(String commitUser) {
+            this.commitUser = commitUser;
+            return this;
+        }
+
+        public Builder commitIdentifier(Long commitIdentifier) {
+            this.commitIdentifier = commitIdentifier;
+            return this;
+        }
+
+        public Builder commitKind(String commitKind) {
+            this.commitKind = commitKind;
+            return this;
+        }
+
+        public Builder commitTime(LocalDateTime commitTime) {
+            this.commitTime = commitTime;
+            return this;
+        }
+
+        public Builder totalRecordCount(Long totalRecordCount) {
+            this.totalRecordCount = totalRecordCount;
+            return this;
+        }
+
+        public Builder deltaRecordCount(Long deltaRecordCount) {
+            this.deltaRecordCount = deltaRecordCount;
+            return this;
+        }
+
+        public Builder changelogRecordCount(Long changelogRecordCount) {
+            this.changelogRecordCount = changelogRecordCount;
+            return this;
+        }
+
+        public Builder watermark(Long watermark) {
+            this.watermark = watermark;
+            return this;
+        }
+
+        public SnapshotTableMetadata build() {
+            return new SnapshotTableMetadata(
+                    snapshotId,
+                    snapshotPath,
+                    schemaId,
+                    commitUser,
+                    commitIdentifier,
+                    commitKind,
+                    commitTime,
+                    totalRecordCount,
+                    deltaRecordCount,
+                    changelogRecordCount,
+                    watermark);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
index 287755d..d09baa4 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
@@ -19,12 +19,46 @@
 package org.apache.paimon.web.api.table;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.table.sink.WriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.web.api.common.CatalogEntity;
+import org.apache.paimon.web.api.common.CatalogProperties;
+import org.apache.paimon.web.api.common.MetastoreType;
+import org.apache.paimon.web.api.common.OperatorKind;
+import org.apache.paimon.web.api.common.WriteMode;
+import org.apache.paimon.web.common.annotation.VisibleForTesting;
+import org.apache.paimon.web.common.utils.ParameterValidationUtil;
 
 import com.google.common.collect.ImmutableList;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,9 +66,19 @@ import java.util.Map;
 /** paimon table manager. */
 public class TableManager {
 
+    private static final String SNAPSHOTS = "snapshots";
+    private static final String SCHEMAS = "schemas";
+    private static final String OPTIONS = "options";
+    private static final String MANIFESTS = "manifests";
+    private static final String FILES = "files";
+    private static final String CONSUMER = "consumers";
+    private static final String TAGS = "tags";
+
     public static void createTable(
             Catalog catalog, String dbName, String tableName, TableMetadata 
tableMetadata)
             throws Catalog.TableAlreadyExistException, 
Catalog.DatabaseNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
         Schema.Builder schemaBuilder =
                 Schema.newBuilder()
                         .partitionKeys(
@@ -60,34 +104,515 @@ public class TableManager {
     }
 
     public static boolean tableExists(Catalog catalog, String dbName, String 
tableName) {
+        checkNotNull(catalog, dbName, tableName);
+
         Identifier identifier = Identifier.create(dbName, tableName);
         return catalog.tableExists(identifier);
     }
 
     public static Table getTable(Catalog catalog, String dbName, String 
tableName)
             throws Catalog.TableNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
         Identifier identifier = Identifier.create(dbName, tableName);
         return catalog.getTable(identifier);
     }
 
     public static List<String> listTables(Catalog catalog, String dbName)
             throws Catalog.DatabaseNotExistException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"));
         return catalog.listTables(dbName);
     }
 
     public static void dropTable(Catalog catalog, String dbName, String 
tableName)
             throws Catalog.TableNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
         Identifier identifier = Identifier.create(dbName, tableName);
         catalog.dropTable(identifier, false);
     }
 
     public static void renameTable(Catalog catalog, String dbName, String 
fromTable, String toTable)
             throws Catalog.TableAlreadyExistException, 
Catalog.TableNotExistException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(fromTable, () -> "From table name"),
+                new SimpleEntry<>(toTable, () -> "To table name"));
+
         Identifier fromTableIdentifier = Identifier.create(dbName, fromTable);
         Identifier toTableIdentifier = Identifier.create(dbName, toTable);
         catalog.renameTable(fromTableIdentifier, toTableIdentifier, false);
     }
 
+    public static void setOptions(
+            Catalog catalog, String dbName, String tableName, Map<String, 
String> options)
+            throws Catalog.ColumnAlreadyExistException, 
Catalog.TableNotExistException,
+                    Catalog.ColumnNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
+        Identifier identifier = Identifier.create(dbName, tableName);
+
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        Map<String, String> filteredOptions = handleOptions(options);
+        for (String key : filteredOptions.keySet()) {
+            SchemaChange addOption = SchemaChange.setOption(key, 
filteredOptions.get(key));
+            schemaChanges.add(addOption);
+        }
+
+        catalog.alterTable(identifier, schemaChanges, false);
+    }
+
+    public static void removeOptions(
+            Catalog catalog, String dbName, String tableName, Map<String, 
String> options)
+            throws Catalog.ColumnAlreadyExistException, 
Catalog.TableNotExistException,
+                    Catalog.ColumnNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
+        Identifier identifier = Identifier.create(dbName, tableName);
+
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        Map<String, String> filteredOptions = handleOptions(options);
+        for (String key : filteredOptions.keySet()) {
+            SchemaChange addOption = SchemaChange.removeOption(key);
+            schemaChanges.add(addOption);
+        }
+
+        catalog.alterTable(identifier, schemaChanges, false);
+    }
+
+    private static SchemaChange addColumn(AlterTableEntity entity) {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(entity.getType(), () -> "Column type"));
+        return SchemaChange.addColumn(
+                entity.getColumnName(), entity.getType(), entity.getComment());
+    }
+
+    private static SchemaChange renameColumn(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(entity.getNewColumn(), () -> "New column 
name"),
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+        validateColumnExistence(catalog, dbName, tableName, 
entity.getColumnName());
+        return SchemaChange.renameColumn(entity.getColumnName(), 
entity.getNewColumn());
+    }
+
+    private static SchemaChange dropColumn(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+        validateColumnExistence(catalog, dbName, tableName, 
entity.getColumnName());
+        return SchemaChange.dropColumn(entity.getColumnName());
+    }
+
+    private static SchemaChange updateColumnComment(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+        validateColumnExistence(catalog, dbName, tableName, 
entity.getColumnName());
+        return SchemaChange.updateColumnComment(entity.getColumnName(), 
entity.getComment());
+    }
+
+    private static SchemaChange updateColumnType(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(entity.getType(), () -> "Column type"),
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+        validateColumnExistence(catalog, dbName, tableName, 
entity.getColumnName());
+        return SchemaChange.updateColumnType(entity.getColumnName(), 
entity.getType());
+    }
+
+    private static SchemaChange updateColumnPosition(AlterTableEntity entity) {
+        ParameterValidationUtil.checkNotNull(new 
SimpleEntry<>(entity.getMove(), () -> "Move"));
+        return SchemaChange.updateColumnPosition(entity.getMove());
+    }
+
+    private static SchemaChange updateColumnNullability(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+        validateColumnExistence(catalog, dbName, tableName, 
entity.getColumnName());
+        return SchemaChange.updateColumnNullability(entity.getColumnName(), 
entity.isNullable());
+    }
+
+    private static SchemaChange performAlterTableAction(
+            Catalog catalog, String dbName, String tableName, AlterTableEntity 
entity)
+            throws Catalog.TableNotExistException, IOException {
+        OperatorKind kind = entity.getKind();
+
+        switch (kind) {
+            case ADD_COLUMN:
+                return addColumn(entity);
+            case RENAME_COLUMN:
+                return renameColumn(catalog, dbName, tableName, entity);
+            case DROP_COLUMN:
+                return dropColumn(catalog, dbName, tableName, entity);
+            case UPDATE_COLUMN_COMMENT:
+                return updateColumnComment(catalog, dbName, tableName, entity);
+            case UPDATE_COLUMN_TYPE:
+                return updateColumnType(catalog, dbName, tableName, entity);
+            case UPDATE_COLUMN_POSITION:
+                return updateColumnPosition(entity);
+            case UPDATE_COLUMN_NULLABILITY:
+                return updateColumnNullability(catalog, dbName, tableName, 
entity);
+            default:
+                return null;
+        }
+    }
+
+    public static void alterTable(
+            Catalog catalog, String dbName, String tableName, 
List<AlterTableEntity> entities)
+            throws Catalog.TableNotExistException, IOException, 
Catalog.ColumnAlreadyExistException,
+                    Catalog.ColumnNotExistException {
+        checkNotNull(catalog, dbName, tableName);
+
+        Identifier identifier = Identifier.create(dbName, tableName);
+
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        for (AlterTableEntity entity : entities) {
+            SchemaChange schemaChange = performAlterTableAction(catalog, 
dbName, tableName, entity);
+            schemaChanges.add(schemaChange);
+        }
+
+        catalog.alterTable(identifier, schemaChanges, false);
+    }
+
+    @VisibleForTesting
+    private static SchemaTableMetadata getLatestSchema(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        List<SchemaTableMetadata> schemas = listSchemas(catalog, dbName, 
tableName);
+        return schemas.stream()
+                
.max(Comparator.comparingLong(SchemaTableMetadata::getSchemaId))
+                .orElse(null);
+    }
+
+    public static List<SnapshotTableMetadata> listSnapshots(
+            Catalog catalog, CatalogEntity catalogEntity, String dbName, 
String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<SnapshotTableMetadata> snapshots = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + 
SNAPSHOTS + "`");
+
+        SnapshotManager snapshotManager =
+                getSnapshotManager(catalog, catalogEntity, dbName, tableName);
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    SnapshotTableMetadata snapshotTableMetadata =
+                            SnapshotTableMetadata.builder()
+                                    .snapshotId(row.getLong(1))
+                                    .schemaId(row.getLong(2))
+                                    .commitUser(row.getString(3).toString())
+                                    .commitIdentifier(row.getLong(4))
+                                    .commitKind(row.getString(5).toString())
+                                    .commitTime(row.getTimestamp(6, 
3).toLocalDateTime())
+                                    .totalRecordCount(row.getLong(7))
+                                    .deltaRecordCount(row.getLong(8))
+                                    .changelogRecordCount(row.getLong(9))
+                                    .watermark(row.getLong(10))
+                                    .snapshotPath(
+                                            
snapshotManager.snapshotPath(row.getLong(1)).toString())
+                                    .build();
+                    snapshots.add(snapshotTableMetadata);
+                });
+
+        return snapshots;
+    }
+
+    public static List<SchemaTableMetadata> listSchemas(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<SchemaTableMetadata> schemas = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + 
SCHEMAS + "`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    SchemaTableMetadata schemaTableMetadata =
+                            SchemaTableMetadata.builder()
+                                    .schemaId(row.getLong(1))
+                                    .fields(row.getString(2).toString())
+                                    .partitionKeys(row.getString(3).toString())
+                                    .primaryKeys(row.getString(4).toString())
+                                    .options(row.getString(5).toString())
+                                    .comment(row.getString(6).toString())
+                                    .build();
+                    schemas.add(schemaTableMetadata);
+                });
+
+        return schemas;
+    }
+
+    public static List<OptionTableMetadata> listOptions(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<OptionTableMetadata> options = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + 
OPTIONS + "`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    OptionTableMetadata optionsTableMetadata =
+                            new OptionTableMetadata(
+                                    row.getString(1).toString(), 
row.getString(2).toString());
+                    options.add(optionsTableMetadata);
+                });
+
+        return options;
+    }
+
+    public static List<ManifestTableMetadata> listManifests(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<ManifestTableMetadata> manifests = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + 
MANIFESTS + "`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    ManifestTableMetadata manifestTableMetadata =
+                            ManifestTableMetadata.builder()
+                                    .fileName(row.getString(1).toString())
+                                    .fileSize(row.getLong(2))
+                                    .numAddedFiles(row.getLong(3))
+                                    .numDeletedFiles(row.getLong(4))
+                                    .schemaId(row.getLong(5))
+                                    .build();
+                    manifests.add(manifestTableMetadata);
+                });
+
+        return manifests;
+    }
+
+    public static List<FileTableMetadata> listFiles(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<FileTableMetadata> files = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + FILES 
+ "`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    FileTableMetadata fileTableMetadata =
+                            FileTableMetadata.builder()
+                                    .partition(row.getString(1).toString())
+                                    .bucket(row.getInt(2))
+                                    .filePath(row.getString(3).toString())
+                                    .fileFormat(row.getString(4).toString())
+                                    .schemaId(row.getLong(5))
+                                    .level(row.getInt(6))
+                                    .fileSizeInBytes(row.getLong(7))
+                                    .minKey(row.getString(8).toString())
+                                    .maxKey(row.getString(9).toString())
+                                    
.nullValueCounts(row.getString(10).toString())
+                                    
.minValueStats(row.getString(11).toString())
+                                    
.maxValueStats(row.getString(12).toString())
+                                    .creationTime(row.getTimestamp(13, 
6).toLocalDateTime())
+                                    .build();
+                    files.add(fileTableMetadata);
+                });
+
+        return files;
+    }
+
+    public static List<ConsumerTableMetadata> listConsumers(
+            Catalog catalog, String dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<ConsumerTableMetadata> consumers = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + 
CONSUMER + "`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+
+        reader.forEachRemaining(
+                row -> {
+                    ConsumerTableMetadata consumerTableMetadata =
+                            new 
ConsumerTableMetadata(row.getString(1).toString(), row.getLong(2));
+                    consumers.add(consumerTableMetadata);
+                });
+        return consumers;
+    }
+
+    public static List<TagTableMetadata> listTags(Catalog catalog, String 
dbName, String tableName)
+            throws Catalog.TableNotExistException, IOException {
+        checkNotNull(catalog, dbName, tableName);
+
+        List<TagTableMetadata> tags = new ArrayList<>();
+
+        Table table = getTable(catalog, dbName, "`" + tableName + "$" + TAGS + 
"`");
+
+        RecordReader<InternalRow> reader = getReader(table);
+        reader.forEachRemaining(
+                row -> {
+                    TagTableMetadata tagTableMetadata =
+                            TagTableMetadata.builder()
+                                    .tagName(row.getString(1).toString())
+                                    .snapshotId(row.getLong(2))
+                                    .schemaId(row.getLong(3))
+                                    .createTime(row.getTimestamp(4, 
3).toLocalDateTime())
+                                    .recordCount(row.getLong(5))
+                                    .build();
+                    tags.add(tagTableMetadata);
+                });
+
+        return tags;
+    }
+
+    @VisibleForTesting
+    private static SnapshotManager getSnapshotManager(
+            Catalog catalog, CatalogEntity catalogEntity, String dbName, 
String tableName)
+            throws IOException {
+        String warehouse = catalogEntity.getWarehouse();
+
+        FileIO fileIO =
+                FileIO.get(
+                        new Path(warehouse),
+                        CatalogContext.create(buildOptions(catalog, 
catalogEntity)));
+
+        String tablePath = warehouse + "/" + dbName + ".db" + "/" + tableName;
+        return new SnapshotManager(fileIO, new Path(tablePath));
+    }
+
+    private static Options buildOptions(Catalog catalog, CatalogEntity 
catalogEntity) {
+        Options options = new Options();
+        if (catalog instanceof FileSystemCatalog) {
+            options.set(CatalogProperties.WAREHOUSE, 
catalogEntity.getWarehouse());
+        } else if (catalog instanceof HiveCatalog) {
+            options.set(CatalogProperties.WAREHOUSE, 
catalogEntity.getWarehouse());
+            options.set(CatalogProperties.METASTORE, 
MetastoreType.HIVE.toString());
+            options.set(CatalogProperties.URI, catalogEntity.getUri());
+            options.set(CatalogProperties.HIVE_CONF_DIR, 
catalogEntity.getHiveConfDir());
+        }
+        return options;
+    }
+
+    @VisibleForTesting
+    private static RecordReader<InternalRow> getReader(Table table) {
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead tableRead = readBuilder.newRead();
+        try {
+            return tableRead.createReader(splits);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @VisibleForTesting
+    private static WriteBuilder getWriteBuilder(
+            Table table, String writeMode, @Nullable Map<String, String> 
staticPartition) {
+        if (writeMode.equals(WriteMode.BATCH.getValue())) {
+            return table.newBatchWriteBuilder().withOverwrite(staticPartition);
+        } else {
+            return table.newStreamWriteBuilder();
+        }
+    }
+
+    public static TableWrite getBatchTableWriter(
+            Table table, @Nullable Map<String, String> staticPartition) {
+        BatchWriteBuilder writeBuilder =
+                (BatchWriteBuilder)
+                        getWriteBuilder(table, WriteMode.BATCH.getValue(), 
staticPartition);
+        return writeBuilder.newWrite();
+    }
+
+    public static TableWrite getStreamTableWriter(Table table) {
+        StreamWriteBuilder writeBuilder =
+                (StreamWriteBuilder) getWriteBuilder(table, 
WriteMode.STREAM.getValue(), null);
+        return writeBuilder.newWrite();
+    }
+
+    public static void batchWrite(
+            List<GenericRow> records,
+            Catalog catalog,
+            String dbName,
+            String tableName,
+            @Nullable Map<String, String> staticPartition)
+            throws Exception {
+        checkNotNull(catalog, dbName, tableName);
+
+        BatchWriteBuilder writeBuilder =
+                (BatchWriteBuilder)
+                        getWriteBuilder(
+                                getTable(catalog, dbName, tableName),
+                                WriteMode.BATCH.getValue(),
+                                staticPartition);
+
+        List<CommitMessage> commitMessages;
+        try (BatchTableWrite writer = writeBuilder.newWrite()) {
+
+            for (GenericRow record : records) {
+                writer.write(record);
+            }
+
+            commitMessages = writer.prepareCommit();
+        }
+
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    private static void checkNotNull(Catalog catalog, String dbName, String 
tableName) {
+        ParameterValidationUtil.checkNotNull(
+                new SimpleEntry<>(catalog, () -> "Catalog"),
+                new SimpleEntry<>(dbName, () -> "Database name"),
+                new SimpleEntry<>(tableName, () -> "Table name"));
+    }
+
+    private static void validateColumnExistence(
+            Catalog catalog, String dbName, String tableName, String 
columnName)
+            throws Catalog.TableNotExistException, IOException {
+        SchemaTableMetadata latestSchema = getLatestSchema(catalog, dbName, 
tableName);
+        if (!latestSchema.getFields().contains(columnName)) {
+            throw new RuntimeException("Column not found: " + columnName);
+        }
+    }
+
     private static Map<String, String> handleOptions(Map<String, String> 
options) {
         List<String> keys = TableOptionExtractor.keys();
         Map<String, String> filteredOptions = new HashMap<>();
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
new file mode 100644
index 0000000..f26170c
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** file table metadata. */
+public class TagTableMetadata {
+
+    private final String tagName;
+    private final Long snapshotId;
+    private final Long schemaId;
+    private final LocalDateTime createTime;
+    private final Long recordCount;
+
+    public TagTableMetadata(
+            String tagName,
+            Long snapshotId,
+            Long schemaId,
+            LocalDateTime createTime,
+            Long recordCount) {
+        this.tagName = tagName;
+        this.snapshotId = snapshotId;
+        this.schemaId = schemaId;
+        this.createTime = createTime;
+        this.recordCount = recordCount;
+    }
+
+    public String getTagName() {
+        return tagName;
+    }
+
+    public Long getSnapshotId() {
+        return snapshotId;
+    }
+
+    public Long getSchemaId() {
+        return schemaId;
+    }
+
+    public LocalDateTime getCreateTime() {
+        return createTime;
+    }
+
+    public Long getRecordCount() {
+        return recordCount;
+    }
+
+    public static TagTableMetadata.Builder builder() {
+        return new Builder();
+    }
+
+    /** The builder for TagTableMetadata. */
+    public static final class Builder {
+        private String tagName;
+        private Long snapshotId;
+        private Long schemaId;
+        private LocalDateTime createTime;
+        @Nullable private Long recordCount;
+
+        public Builder tagName(String tagName) {
+            this.tagName = tagName;
+            return this;
+        }
+
+        public Builder snapshotId(Long snapshotId) {
+            this.snapshotId = snapshotId;
+            return this;
+        }
+
+        public Builder schemaId(Long schemaId) {
+            this.schemaId = schemaId;
+            return this;
+        }
+
+        public Builder createTime(LocalDateTime createTime) {
+            this.createTime = createTime;
+            return this;
+        }
+
+        public Builder recordCount(Long recordCount) {
+            this.recordCount = recordCount;
+            return this;
+        }
+
+        public TagTableMetadata build() {
+            return new TagTableMetadata(tagName, snapshotId, schemaId, 
createTime, recordCount);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
similarity index 51%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
index c8930fd..c453fef 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,16 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.common.annotation;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
 
-    public static final String METASTORE_TYPE = "metastore";
-
-    public static final String WAREHOUSE_LOCATION = "warehouse";
-
-    public static final String URI = "uri";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
-}
+/**
+ * This annotations declares that a function, field, constructor, or entire 
type, is only visible
+ * for testing purposes.
+ *
+ * <p>This annotation is typically attached when for example a method should 
be {@code private}
+ * (because it is not intended to be called externally), but cannot be 
declared private, because
+ * some tests need to have access to it.
+ */
+@Documented
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, 
ElementType.CONSTRUCTOR})
+public @interface VisibleForTesting {}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
similarity index 61%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to 
paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
index c8930fd..aebad3e 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++ 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
@@ -16,16 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.common.utils;
 
-/** paimon catalog properties. */
-public class CatalogProperties {
+import java.util.Map;
+import java.util.function.Supplier;
 
-    public static final String METASTORE_TYPE = "metastore";
+/** parameter validation util. */
+public class ParameterValidationUtil {
 
-    public static final String WAREHOUSE_LOCATION = "warehouse";
-
-    public static final String URI = "uri";
-
-    public static final String HIVE_CONF_DIR = "hive-conf-dir";
+    @SafeVarargs
+    public static void checkNotNull(Map.Entry<Object, Supplier<String>>... 
entries) {
+        for (Map.Entry<Object, Supplier<String>> entry : entries) {
+            if (entry.getKey() == null) {
+                throw new IllegalArgumentException(entry.getValue().get() + " 
can not be null.");
+            }
+        }
+    }
 }

Reply via email to