This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new c8ce4df [Imrove] Improve api module (#10)
c8ce4df is described below
commit c8ce4df46dbc947a8e6b797f660cacd66aac4ff1
Author: s7monk <[email protected]>
AuthorDate: Thu Jul 27 15:00:58 2023 +0800
[Imrove] Improve api module (#10)
---
.../paimon/web/api/catalog/CatalogCreator.java | 8 +-
.../paimon/web/api/common/CatalogEntity.java | 110 +++++
.../paimon/web/api/common/CatalogProperties.java | 4 +-
.../{CatalogProperties.java => MetastoreType.java} | 18 +-
.../{CatalogProperties.java => OperatorKind.java} | 23 +-
.../{CatalogProperties.java => WriteMode.java} | 18 +-
.../paimon/web/api/table/AlterTableEntity.java | 137 ++++++
.../ConsumerTableMetadata.java} | 31 +-
.../paimon/web/api/table/FileTableMetadata.java | 239 ++++++++++
.../web/api/table/ManifestTableMetadata.java | 105 +++++
.../OptionTableMetadata.java} | 31 +-
.../paimon/web/api/table/SchemaTableMetadata.java | 120 +++++
.../web/api/table/SnapshotTableMetadata.java | 197 ++++++++
.../apache/paimon/web/api/table/TableManager.java | 525 +++++++++++++++++++++
.../paimon/web/api/table/TagTableMetadata.java | 108 +++++
.../web/common/annotation/VisibleForTesting.java | 28 +-
.../web/common/utils/ParameterValidationUtil.java | 22 +-
17 files changed, 1662 insertions(+), 62 deletions(-)
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
index 22514e6..143f854 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/CatalogCreator.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.web.api.common.CatalogProperties;
+import org.apache.paimon.web.api.common.MetastoreType;
/** paimon catalog creator. */
public class CatalogCreator {
@@ -33,11 +34,10 @@ public class CatalogCreator {
return CatalogFactory.createCatalog(context);
}
- public static Catalog createHiveCatalog(
- String warehouse, String metastore, String uri, String
hiveConfDir) {
+ public static Catalog createHiveCatalog(String warehouse, String uri,
String hiveConfDir) {
Options options = new Options();
- options.set(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
- options.set(CatalogProperties.METASTORE_TYPE, metastore);
+ options.set(CatalogProperties.WAREHOUSE, warehouse);
+ options.set(CatalogProperties.METASTORE,
MetastoreType.HIVE.toString());
options.set(CatalogProperties.URI, uri);
options.set(CatalogProperties.HIVE_CONF_DIR, hiveConfDir);
CatalogContext context = CatalogContext.create(options);
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
new file mode 100644
index 0000000..abaf5dd
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogEntity.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.common;
+
+import javax.annotation.Nullable;
+
+/** catalog entity. */
+public class CatalogEntity {
+
+ private final Long catalogId;
+
+ private final String warehouse;
+
+ private final String metastoreType;
+
+ private final String uri;
+
+ private final String hiveConfDir;
+
+ public CatalogEntity(
+ Long catalogId,
+ String warehouse,
+ String metastoreType,
+ String uri,
+ String hiveConfDir) {
+ this.catalogId = catalogId;
+ this.warehouse = warehouse;
+ this.metastoreType = metastoreType;
+ this.uri = uri;
+ this.hiveConfDir = hiveConfDir;
+ }
+
+ public Long getCatalogId() {
+ return catalogId;
+ }
+
+ public String getWarehouse() {
+ return warehouse;
+ }
+
+ public String getMetastoreType() {
+ return metastoreType;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public String getHiveConfDir() {
+ return hiveConfDir;
+ }
+
+ public static CatalogEntity.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for CatalogEntity. */
+ public static final class Builder {
+ private Long catalogId;
+ private String warehouse;
+ private String metastoreType;
+ @Nullable private String uri;
+ @Nullable private String hiveConfDir;
+
+ public Builder catalogId(Long catalogId) {
+ this.catalogId = catalogId;
+ return this;
+ }
+
+ public Builder warehouse(String warehouse) {
+ this.warehouse = warehouse;
+ return this;
+ }
+
+ public Builder metastoreType(String metastoreType) {
+ this.metastoreType = metastoreType;
+ return this;
+ }
+
+ public Builder uri(String uri) {
+ this.uri = uri;
+ return this;
+ }
+
+ public Builder hiveConfDir(String hiveConfDir) {
+ this.hiveConfDir = hiveConfDir;
+ return this;
+ }
+
+ public CatalogEntity build() {
+ return new CatalogEntity(catalogId, warehouse, metastoreType, uri,
hiveConfDir);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
index c8930fd..7360df3 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
@@ -21,9 +21,9 @@ package org.apache.paimon.web.api.common;
/** paimon catalog properties. */
public class CatalogProperties {
- public static final String METASTORE_TYPE = "metastore";
+ public static final String METASTORE = "metastore";
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
similarity index 73%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
index c8930fd..6d2d385 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/MetastoreType.java
@@ -18,14 +18,18 @@
package org.apache.paimon.web.api.common;
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of catalog metastore type. */
+public enum MetastoreType {
+ FILE_SYSTEM("filesystem"),
+ HIVE("hive");
- public static final String METASTORE_TYPE = "metastore";
+ private final String value;
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ MetastoreType(String value) {
+ this.value = value;
+ }
- public static final String URI = "uri";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public String toString() {
+ return value;
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
similarity index 64%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
index c8930fd..d549ed6 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/OperatorKind.java
@@ -18,14 +18,23 @@
package org.apache.paimon.web.api.common;
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of operator kind. */
+public enum OperatorKind {
+ ADD_COLUMN("add"),
+ RENAME_COLUMN("rename"),
+ DROP_COLUMN("drop"),
+ UPDATE_COLUMN_COMMENT("update_comment"),
+ UPDATE_COLUMN_TYPE("update_type"),
+ UPDATE_COLUMN_POSITION("update_position"),
+ UPDATE_COLUMN_NULLABILITY("update_nullability");
- public static final String METASTORE_TYPE = "metastore";
+ private final String value;
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ OperatorKind(String value) {
+ this.value = value;
+ }
- public static final String URI = "uri";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public String value() {
+ return value;
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
similarity index 73%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
index c8930fd..7085fc5 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/WriteMode.java
@@ -18,14 +18,18 @@
package org.apache.paimon.web.api.common;
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** Enum of write mode. */
+public enum WriteMode {
+ STREAM("stream"),
+ BATCH("batch");
- public static final String METASTORE_TYPE = "metastore";
+ private final String value;
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ WriteMode(String value) {
+ this.value = value;
+ }
- public static final String URI = "uri";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public String getValue() {
+ return value;
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
new file mode 100644
index 0000000..fea0d6a
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/AlterTableEntity.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.web.api.common.OperatorKind;
+
+import javax.annotation.Nullable;
+
+/** alter table entity. */
+public class AlterTableEntity {
+
+ private final String columnName;
+ private final DataType type;
+ private final String comment;
+ private final String newColumn;
+ private final boolean isNullable;
+ private final SchemaChange.Move move;
+ private final OperatorKind kind;
+
+ public AlterTableEntity(
+ String columnName,
+ DataType type,
+ String comment,
+ String newColumn,
+ boolean isNullable,
+ SchemaChange.Move move,
+ OperatorKind kind) {
+ this.columnName = columnName;
+ this.type = type;
+ this.comment = comment;
+ this.newColumn = newColumn;
+ this.isNullable = isNullable;
+ this.move = move;
+ this.kind = kind;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public DataType getType() {
+ return type;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public String getNewColumn() {
+ return newColumn;
+ }
+
+ public boolean isNullable() {
+ return isNullable;
+ }
+
+ public SchemaChange.Move getMove() {
+ return move;
+ }
+
+ public OperatorKind getKind() {
+ return kind;
+ }
+
+ public static AlterTableEntity.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for AlterTableEntity. */
+ public static class Builder {
+ private String columnName;
+ @Nullable private DataType type;
+ @Nullable private String comment;
+ @Nullable private String newColumn;
+ @Nullable private boolean isNullable;
+ @Nullable private SchemaChange.Move move;
+ private OperatorKind kind;
+
+ public Builder columnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+
+ public Builder type(DataType type) {
+ this.type = type;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public Builder newColumn(String newColumn) {
+ this.newColumn = newColumn;
+ return this;
+ }
+
+ public Builder nullable(boolean isNullable) {
+ this.isNullable = isNullable;
+ return this;
+ }
+
+ public Builder move(SchemaChange.Move move) {
+ this.move = move;
+ return this;
+ }
+
+ public Builder kind(OperatorKind kind) {
+ this.kind = kind;
+ return this;
+ }
+
+ public AlterTableEntity build() {
+ return new AlterTableEntity(
+ columnName, type, comment, newColumn, isNullable, move,
kind);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
similarity index 54%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
index c8930fd..12e52dd 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ConsumerTableMetadata.java
@@ -16,16 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.api.table;
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** file table metadata. */
+public class ConsumerTableMetadata {
- public static final String METASTORE_TYPE = "metastore";
+ private String consumerId;
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ private Long nextSnapshotId;
- public static final String URI = "uri";
+ public ConsumerTableMetadata(String consumerId, Long nextSnapshotId) {
+ this.consumerId = consumerId;
+ this.nextSnapshotId = nextSnapshotId;
+ }
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public String getConsumerId() {
+ return consumerId;
+ }
+
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
+ }
+
+ public Long getNextSnapshotId() {
+ return nextSnapshotId;
+ }
+
+ public void setNextSnapshotId(Long nextSnapshotId) {
+ this.nextSnapshotId = nextSnapshotId;
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
new file mode 100644
index 0000000..3c0776e
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/FileTableMetadata.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** file table metadata. */
+public class FileTableMetadata {
+
+ private final String partition;
+ private final Integer bucket;
+ private final String filePath;
+ private final String fileFormat;
+ private final Long schemaId;
+ private final Integer level;
+ private final Long recordCount;
+ private final Long fileSizeInBytes;
+ private final String minKey;
+ private final String maxKey;
+ private final String nullValueCounts;
+ private final String minValueStats;
+ private final String maxValueStats;
+ private final LocalDateTime creationTime;
+
+ public FileTableMetadata(
+ String partition,
+ Integer bucket,
+ String filePath,
+ String fileFormat,
+ Long schemaId,
+ Integer level,
+ Long recordCount,
+ Long fileSizeInBytes,
+ String minKey,
+ String maxKey,
+ String nullValueCounts,
+ String minValueStats,
+ String maxValueStats,
+ LocalDateTime creationTime) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.filePath = filePath;
+ this.fileFormat = fileFormat;
+ this.schemaId = schemaId;
+ this.level = level;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ this.minKey = minKey;
+ this.maxKey = maxKey;
+ this.nullValueCounts = nullValueCounts;
+ this.minValueStats = minValueStats;
+ this.maxValueStats = maxValueStats;
+ this.creationTime = creationTime;
+ }
+
+ public String getPartition() {
+ return partition;
+ }
+
+ public Integer getBucket() {
+ return bucket;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public String getFileFormat() {
+ return fileFormat;
+ }
+
+ public Long getSchemaId() {
+ return schemaId;
+ }
+
+ public Integer getLevel() {
+ return level;
+ }
+
+ public Long getRecordCount() {
+ return recordCount;
+ }
+
+ public Long getFileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public String getMinKey() {
+ return minKey;
+ }
+
+ public String getMaxKey() {
+ return maxKey;
+ }
+
+ public String getNullValueCounts() {
+ return nullValueCounts;
+ }
+
+ public String getMinValueStats() {
+ return minValueStats;
+ }
+
+ public String getMaxValueStats() {
+ return maxValueStats;
+ }
+
+ public LocalDateTime getCreationTime() {
+ return creationTime;
+ }
+
+ public static FileTableMetadata.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for FileTableMetadata. */
+ public static final class Builder {
+ @Nullable private String partition;
+ private Integer bucket;
+ private String filePath;
+ private String fileFormat;
+ private Long schemaId;
+ private Integer level;
+ private Long recordCount;
+ private Long fileSizeInBytes;
+ private String minKey;
+ private String maxKey;
+ private String nullValueCounts;
+ @Nullable private String minValueStats;
+ @Nullable private String maxValueStats;
+ private LocalDateTime creationTime;
+
+ public Builder partition(String partition) {
+ this.partition = partition;
+ return this;
+ }
+
+ public Builder bucket(Integer bucket) {
+ this.bucket = bucket;
+ return this;
+ }
+
+ public Builder filePath(String filePath) {
+ this.filePath = filePath;
+ return this;
+ }
+
+ public Builder fileFormat(String fileFormat) {
+ this.fileFormat = fileFormat;
+ return this;
+ }
+
+ public Builder schemaId(Long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ public Builder level(Integer level) {
+ this.level = level;
+ return this;
+ }
+
+ public Builder recordCount(Long recordCount) {
+ this.recordCount = recordCount;
+ return this;
+ }
+
+ public Builder fileSizeInBytes(Long fileSizeInBytes) {
+ this.fileSizeInBytes = fileSizeInBytes;
+ return this;
+ }
+
+ public Builder minKey(String minKey) {
+ this.minKey = minKey;
+ return this;
+ }
+
+ public Builder maxKey(String maxKey) {
+ this.maxKey = maxKey;
+ return this;
+ }
+
+ public Builder nullValueCounts(String nullValueCounts) {
+ this.nullValueCounts = nullValueCounts;
+ return this;
+ }
+
+ public Builder minValueStats(String minValueStats) {
+ this.minValueStats = minValueStats;
+ return this;
+ }
+
+ public Builder maxValueStats(String maxValueStats) {
+ this.maxValueStats = maxValueStats;
+ return this;
+ }
+
+ public Builder creationTime(LocalDateTime creationTime) {
+ this.creationTime = creationTime;
+ return this;
+ }
+
+ public FileTableMetadata build() {
+ return new FileTableMetadata(
+ partition,
+ bucket,
+ filePath,
+ fileFormat,
+ schemaId,
+ level,
+ recordCount,
+ fileSizeInBytes,
+ minKey,
+ maxKey,
+ nullValueCounts,
+ minValueStats,
+ maxValueStats,
+ creationTime);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
new file mode 100644
index 0000000..574899c
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/ManifestTableMetadata.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+/** manifest table metadata. */
+public class ManifestTableMetadata {
+
+ private final String fileName;
+ private final Long fileSize;
+ private final Long numAddedFiles;
+ private final Long numDeletedFiles;
+ private final Long schemaId;
+
+ public ManifestTableMetadata(
+ String fileName,
+ Long fileSize,
+ Long numAddedFiles,
+ Long numDeletedFiles,
+ Long schemaId) {
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.numAddedFiles = numAddedFiles;
+ this.numDeletedFiles = numDeletedFiles;
+ this.schemaId = schemaId;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public Long getFileSize() {
+ return fileSize;
+ }
+
+ public Long getNumAddedFiles() {
+ return numAddedFiles;
+ }
+
+ public Long getNumDeletedFiles() {
+ return numDeletedFiles;
+ }
+
+ public Long getSchemaId() {
+ return schemaId;
+ }
+
+ public static ManifestTableMetadata.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for ManifestTableMetadata. */
+ public static final class Builder {
+ private String fileName;
+ private Long fileSize;
+ private Long numAddedFiles;
+ private Long numDeletedFiles;
+ private Long schemaId;
+
+ public Builder fileName(String fileName) {
+ this.fileName = fileName;
+ return this;
+ }
+
+ public Builder fileSize(Long fileSize) {
+ this.fileSize = fileSize;
+ return this;
+ }
+
+ public Builder numAddedFiles(Long numAddedFiles) {
+ this.numAddedFiles = numAddedFiles;
+ return this;
+ }
+
+ public Builder numDeletedFiles(Long numDeletedFiles) {
+ this.numDeletedFiles = numDeletedFiles;
+ return this;
+ }
+
+ public Builder schemaId(Long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ public ManifestTableMetadata build() {
+ return new ManifestTableMetadata(
+ fileName, fileSize, numAddedFiles, numDeletedFiles,
schemaId);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
similarity index 60%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
index c8930fd..ec259e0 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/OptionTableMetadata.java
@@ -16,16 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.api.table;
-/** paimon catalog properties. */
-public class CatalogProperties {
+/** options table metadata. */
+public class OptionTableMetadata {
- public static final String METASTORE_TYPE = "metastore";
+ private String key;
- public static final String WAREHOUSE_LOCATION = "warehouse";
+ private String value;
- public static final String URI = "uri";
+ public OptionTableMetadata(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
new file mode 100644
index 0000000..e9744e5
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SchemaTableMetadata.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+/** schema table metadata. */
+public class SchemaTableMetadata {
+
+ private final Long schemaId;
+ private final String fields;
+ private final String partitionKeys;
+ private final String primaryKeys;
+ private final String options;
+ private final String comment;
+
+ public SchemaTableMetadata(
+ Long schemaId,
+ String fields,
+ String partitionKeys,
+ String primaryKeys,
+ String options,
+ String comment) {
+ this.schemaId = schemaId;
+ this.fields = fields;
+ this.partitionKeys = partitionKeys;
+ this.primaryKeys = primaryKeys;
+ this.options = options;
+ this.comment = comment;
+ }
+
+ public Long getSchemaId() {
+ return schemaId;
+ }
+
+ public String getFields() {
+ return fields;
+ }
+
+ public String getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ public String getPrimaryKeys() {
+ return primaryKeys;
+ }
+
+ public String getOptions() {
+ return options;
+ }
+
+ public String getComment() {
+ return comment;
+ }
+
+ public static SchemaTableMetadata.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for SchemaTableMetadata. */
+ public static final class Builder {
+ private Long schemaId;
+ private String fields;
+ private String partitionKeys;
+ private String primaryKeys;
+ private String options;
+ @Nullable private String comment;
+
+ public Builder schemaId(Long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ public Builder fields(String fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ public Builder partitionKeys(String partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public Builder primaryKeys(String primaryKeys) {
+ this.primaryKeys = primaryKeys;
+ return this;
+ }
+
+ public Builder options(String options) {
+ this.options = options;
+ return this;
+ }
+
+ public Builder comment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public SchemaTableMetadata build() {
+ return new SchemaTableMetadata(
+ schemaId, fields, partitionKeys, primaryKeys, options,
comment);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
new file mode 100644
index 0000000..9bc0bd4
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/SnapshotTableMetadata.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** snapshot table metadata. */
+public class SnapshotTableMetadata {
+
+ private final Long snapshotId;
+ private final String snapshotPath;
+ private final Long schemaId;
+ private final String commitUser;
+ private final Long commitIdentifier;
+ private final String commitKind;
+ private final LocalDateTime commitTime;
+ private final Long totalRecordCount;
+ private final Long deltaRecordCount;
+ private final Long changelogRecordCount;
+ private final Long watermark;
+
+ public SnapshotTableMetadata(
+ Long snapshotId,
+ String snapshotPath,
+ Long schemaId,
+ String commitUser,
+ Long commitIdentifier,
+ String commitKind,
+ LocalDateTime commitTime,
+ Long totalRecordCount,
+ Long deltaRecordCount,
+ Long changelogRecordCount,
+ Long watermark) {
+ this.snapshotId = snapshotId;
+ this.snapshotPath = snapshotPath;
+ this.schemaId = schemaId;
+ this.commitUser = commitUser;
+ this.commitIdentifier = commitIdentifier;
+ this.commitKind = commitKind;
+ this.commitTime = commitTime;
+ this.totalRecordCount = totalRecordCount;
+ this.deltaRecordCount = deltaRecordCount;
+ this.changelogRecordCount = changelogRecordCount;
+ this.watermark = watermark;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public String getSnapshotPath() {
+ return snapshotPath;
+ }
+
+ public Long getSchemaId() {
+ return schemaId;
+ }
+
+ public String getCommitUser() {
+ return commitUser;
+ }
+
+ public Long getCommitIdentifier() {
+ return commitIdentifier;
+ }
+
+ public String getCommitKind() {
+ return commitKind;
+ }
+
+ public LocalDateTime getCommitTime() {
+ return commitTime;
+ }
+
+ public Long getTotalRecordCount() {
+ return totalRecordCount;
+ }
+
+ public Long getDeltaRecordCount() {
+ return deltaRecordCount;
+ }
+
+ public Long getChangelogRecordCount() {
+ return changelogRecordCount;
+ }
+
+ public Long getWatermark() {
+ return watermark;
+ }
+
+ public static SnapshotTableMetadata.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for SnapshotTableMetadata. */
+ public static final class Builder {
+ private Long snapshotId;
+ private String snapshotPath;
+ private Long schemaId;
+ private String commitUser;
+ private Long commitIdentifier;
+ private String commitKind;
+ private LocalDateTime commitTime;
+ @Nullable private Long totalRecordCount;
+ @Nullable private Long deltaRecordCount;
+ @Nullable private Long changelogRecordCount;
+ @Nullable private Long watermark;
+
+ public Builder snapshotId(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ return this;
+ }
+
+ public Builder snapshotPath(String snapshotPath) {
+ this.snapshotPath = snapshotPath;
+ return this;
+ }
+
+ public Builder schemaId(Long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ public Builder commitUser(String commitUser) {
+ this.commitUser = commitUser;
+ return this;
+ }
+
+ public Builder commitIdentifier(Long commitIdentifier) {
+ this.commitIdentifier = commitIdentifier;
+ return this;
+ }
+
+ public Builder commitKind(String commitKind) {
+ this.commitKind = commitKind;
+ return this;
+ }
+
+ public Builder commitTime(LocalDateTime commitTime) {
+ this.commitTime = commitTime;
+ return this;
+ }
+
+ public Builder totalRecordCount(Long totalRecordCount) {
+ this.totalRecordCount = totalRecordCount;
+ return this;
+ }
+
+ public Builder deltaRecordCount(Long deltaRecordCount) {
+ this.deltaRecordCount = deltaRecordCount;
+ return this;
+ }
+
+ public Builder changelogRecordCount(Long changelogRecordCount) {
+ this.changelogRecordCount = changelogRecordCount;
+ return this;
+ }
+
+ public Builder watermark(Long watermark) {
+ this.watermark = watermark;
+ return this;
+ }
+
+ public SnapshotTableMetadata build() {
+ return new SnapshotTableMetadata(
+ snapshotId,
+ snapshotPath,
+ schemaId,
+ commitUser,
+ commitIdentifier,
+ commitKind,
+ commitTime,
+ totalRecordCount,
+ deltaRecordCount,
+ changelogRecordCount,
+ watermark);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
index 287755d..d09baa4 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TableManager.java
@@ -19,12 +19,46 @@
package org.apache.paimon.web.api.table;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.table.sink.WriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.web.api.common.CatalogEntity;
+import org.apache.paimon.web.api.common.CatalogProperties;
+import org.apache.paimon.web.api.common.MetastoreType;
+import org.apache.paimon.web.api.common.OperatorKind;
+import org.apache.paimon.web.api.common.WriteMode;
+import org.apache.paimon.web.common.annotation.VisibleForTesting;
+import org.apache.paimon.web.common.utils.ParameterValidationUtil;
import com.google.common.collect.ImmutableList;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,9 +66,19 @@ import java.util.Map;
/** paimon table manager. */
public class TableManager {
+ private static final String SNAPSHOTS = "snapshots";
+ private static final String SCHEMAS = "schemas";
+ private static final String OPTIONS = "options";
+ private static final String MANIFESTS = "manifests";
+ private static final String FILES = "files";
+ private static final String CONSUMER = "consumers";
+ private static final String TAGS = "tags";
+
public static void createTable(
Catalog catalog, String dbName, String tableName, TableMetadata
tableMetadata)
throws Catalog.TableAlreadyExistException,
Catalog.DatabaseNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
Schema.Builder schemaBuilder =
Schema.newBuilder()
.partitionKeys(
@@ -60,34 +104,515 @@ public class TableManager {
}
public static boolean tableExists(Catalog catalog, String dbName, String
tableName) {
+ checkNotNull(catalog, dbName, tableName);
+
Identifier identifier = Identifier.create(dbName, tableName);
return catalog.tableExists(identifier);
}
public static Table getTable(Catalog catalog, String dbName, String
tableName)
throws Catalog.TableNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
Identifier identifier = Identifier.create(dbName, tableName);
return catalog.getTable(identifier);
}
public static List<String> listTables(Catalog catalog, String dbName)
throws Catalog.DatabaseNotExistException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"));
return catalog.listTables(dbName);
}
public static void dropTable(Catalog catalog, String dbName, String
tableName)
throws Catalog.TableNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
Identifier identifier = Identifier.create(dbName, tableName);
catalog.dropTable(identifier, false);
}
public static void renameTable(Catalog catalog, String dbName, String
fromTable, String toTable)
throws Catalog.TableAlreadyExistException,
Catalog.TableNotExistException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(fromTable, () -> "From table name"),
+ new SimpleEntry<>(toTable, () -> "To table name"));
+
Identifier fromTableIdentifier = Identifier.create(dbName, fromTable);
Identifier toTableIdentifier = Identifier.create(dbName, toTable);
catalog.renameTable(fromTableIdentifier, toTableIdentifier, false);
}
+ public static void setOptions(
+ Catalog catalog, String dbName, String tableName, Map<String,
String> options)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.TableNotExistException,
+ Catalog.ColumnNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
+ Identifier identifier = Identifier.create(dbName, tableName);
+
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ Map<String, String> filteredOptions = handleOptions(options);
+ for (String key : filteredOptions.keySet()) {
+ SchemaChange addOption = SchemaChange.setOption(key,
filteredOptions.get(key));
+ schemaChanges.add(addOption);
+ }
+
+ catalog.alterTable(identifier, schemaChanges, false);
+ }
+
+ public static void removeOptions(
+ Catalog catalog, String dbName, String tableName, Map<String,
String> options)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.TableNotExistException,
+ Catalog.ColumnNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
+ Identifier identifier = Identifier.create(dbName, tableName);
+
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ Map<String, String> filteredOptions = handleOptions(options);
+ for (String key : filteredOptions.keySet()) {
+ SchemaChange addOption = SchemaChange.removeOption(key);
+ schemaChanges.add(addOption);
+ }
+
+ catalog.alterTable(identifier, schemaChanges, false);
+ }
+
+ private static SchemaChange addColumn(AlterTableEntity entity) {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(entity.getType(), () -> "Column type"));
+ return SchemaChange.addColumn(
+ entity.getColumnName(), entity.getType(), entity.getComment());
+ }
+
+ private static SchemaChange renameColumn(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(entity.getNewColumn(), () -> "New column
name"),
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ validateColumnExistence(catalog, dbName, tableName,
entity.getColumnName());
+ return SchemaChange.renameColumn(entity.getColumnName(),
entity.getNewColumn());
+ }
+
+ private static SchemaChange dropColumn(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ validateColumnExistence(catalog, dbName, tableName,
entity.getColumnName());
+ return SchemaChange.dropColumn(entity.getColumnName());
+ }
+
+ private static SchemaChange updateColumnComment(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ validateColumnExistence(catalog, dbName, tableName,
entity.getColumnName());
+ return SchemaChange.updateColumnComment(entity.getColumnName(),
entity.getComment());
+ }
+
+ private static SchemaChange updateColumnType(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(entity.getType(), () -> "Column type"),
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ validateColumnExistence(catalog, dbName, tableName,
entity.getColumnName());
+ return SchemaChange.updateColumnType(entity.getColumnName(),
entity.getType());
+ }
+
+ private static SchemaChange updateColumnPosition(AlterTableEntity entity) {
+ ParameterValidationUtil.checkNotNull(new
SimpleEntry<>(entity.getMove(), () -> "Move"));
+ return SchemaChange.updateColumnPosition(entity.getMove());
+ }
+
+ private static SchemaChange updateColumnNullability(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(entity.getColumnName(), () -> "Column name"),
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ validateColumnExistence(catalog, dbName, tableName,
entity.getColumnName());
+ return SchemaChange.updateColumnNullability(entity.getColumnName(),
entity.isNullable());
+ }
+
+ private static SchemaChange performAlterTableAction(
+ Catalog catalog, String dbName, String tableName, AlterTableEntity
entity)
+ throws Catalog.TableNotExistException, IOException {
+ OperatorKind kind = entity.getKind();
+
+ switch (kind) {
+ case ADD_COLUMN:
+ return addColumn(entity);
+ case RENAME_COLUMN:
+ return renameColumn(catalog, dbName, tableName, entity);
+ case DROP_COLUMN:
+ return dropColumn(catalog, dbName, tableName, entity);
+ case UPDATE_COLUMN_COMMENT:
+ return updateColumnComment(catalog, dbName, tableName, entity);
+ case UPDATE_COLUMN_TYPE:
+ return updateColumnType(catalog, dbName, tableName, entity);
+ case UPDATE_COLUMN_POSITION:
+ return updateColumnPosition(entity);
+ case UPDATE_COLUMN_NULLABILITY:
+ return updateColumnNullability(catalog, dbName, tableName,
entity);
+ default:
+ return null;
+ }
+ }
+
+ public static void alterTable(
+ Catalog catalog, String dbName, String tableName,
List<AlterTableEntity> entities)
+ throws Catalog.TableNotExistException, IOException,
Catalog.ColumnAlreadyExistException,
+ Catalog.ColumnNotExistException {
+ checkNotNull(catalog, dbName, tableName);
+
+ Identifier identifier = Identifier.create(dbName, tableName);
+
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ for (AlterTableEntity entity : entities) {
+ SchemaChange schemaChange = performAlterTableAction(catalog,
dbName, tableName, entity);
+ schemaChanges.add(schemaChange);
+ }
+
+ catalog.alterTable(identifier, schemaChanges, false);
+ }
+
+ @VisibleForTesting
+ private static SchemaTableMetadata getLatestSchema(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ List<SchemaTableMetadata> schemas = listSchemas(catalog, dbName,
tableName);
+ return schemas.stream()
+
.max(Comparator.comparingLong(SchemaTableMetadata::getSchemaId))
+ .orElse(null);
+ }
+
+ public static List<SnapshotTableMetadata> listSnapshots(
+ Catalog catalog, CatalogEntity catalogEntity, String dbName,
String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<SnapshotTableMetadata> snapshots = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" +
SNAPSHOTS + "`");
+
+ SnapshotManager snapshotManager =
+ getSnapshotManager(catalog, catalogEntity, dbName, tableName);
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ SnapshotTableMetadata snapshotTableMetadata =
+ SnapshotTableMetadata.builder()
+ .snapshotId(row.getLong(1))
+ .schemaId(row.getLong(2))
+ .commitUser(row.getString(3).toString())
+ .commitIdentifier(row.getLong(4))
+ .commitKind(row.getString(5).toString())
+ .commitTime(row.getTimestamp(6,
3).toLocalDateTime())
+ .totalRecordCount(row.getLong(7))
+ .deltaRecordCount(row.getLong(8))
+ .changelogRecordCount(row.getLong(9))
+ .watermark(row.getLong(10))
+ .snapshotPath(
+
snapshotManager.snapshotPath(row.getLong(1)).toString())
+ .build();
+ snapshots.add(snapshotTableMetadata);
+ });
+
+ return snapshots;
+ }
+
+ public static List<SchemaTableMetadata> listSchemas(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<SchemaTableMetadata> schemas = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" +
SCHEMAS + "`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ SchemaTableMetadata schemaTableMetadata =
+ SchemaTableMetadata.builder()
+ .schemaId(row.getLong(1))
+ .fields(row.getString(2).toString())
+ .partitionKeys(row.getString(3).toString())
+ .primaryKeys(row.getString(4).toString())
+ .options(row.getString(5).toString())
+ .comment(row.getString(6).toString())
+ .build();
+ schemas.add(schemaTableMetadata);
+ });
+
+ return schemas;
+ }
+
+ public static List<OptionTableMetadata> listOptions(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<OptionTableMetadata> options = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" +
OPTIONS + "`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ OptionTableMetadata optionsTableMetadata =
+ new OptionTableMetadata(
+ row.getString(1).toString(),
row.getString(2).toString());
+ options.add(optionsTableMetadata);
+ });
+
+ return options;
+ }
+
+ public static List<ManifestTableMetadata> listManifests(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<ManifestTableMetadata> manifests = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" +
MANIFESTS + "`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ ManifestTableMetadata manifestTableMetadata =
+ ManifestTableMetadata.builder()
+ .fileName(row.getString(1).toString())
+ .fileSize(row.getLong(2))
+ .numAddedFiles(row.getLong(3))
+ .numDeletedFiles(row.getLong(4))
+ .schemaId(row.getLong(5))
+ .build();
+ manifests.add(manifestTableMetadata);
+ });
+
+ return manifests;
+ }
+
+ public static List<FileTableMetadata> listFiles(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<FileTableMetadata> files = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" + FILES
+ "`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ FileTableMetadata fileTableMetadata =
+ FileTableMetadata.builder()
+ .partition(row.getString(1).toString())
+ .bucket(row.getInt(2))
+ .filePath(row.getString(3).toString())
+ .fileFormat(row.getString(4).toString())
+ .schemaId(row.getLong(5))
+ .level(row.getInt(6))
+ .fileSizeInBytes(row.getLong(7))
+ .minKey(row.getString(8).toString())
+ .maxKey(row.getString(9).toString())
+
.nullValueCounts(row.getString(10).toString())
+
.minValueStats(row.getString(11).toString())
+
.maxValueStats(row.getString(12).toString())
+ .creationTime(row.getTimestamp(13,
6).toLocalDateTime())
+ .build();
+ files.add(fileTableMetadata);
+ });
+
+ return files;
+ }
+
+ public static List<ConsumerTableMetadata> listConsumers(
+ Catalog catalog, String dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<ConsumerTableMetadata> consumers = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" +
CONSUMER + "`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+
+ reader.forEachRemaining(
+ row -> {
+ ConsumerTableMetadata consumerTableMetadata =
+ new
ConsumerTableMetadata(row.getString(1).toString(), row.getLong(2));
+ consumers.add(consumerTableMetadata);
+ });
+ return consumers;
+ }
+
+ public static List<TagTableMetadata> listTags(Catalog catalog, String
dbName, String tableName)
+ throws Catalog.TableNotExistException, IOException {
+ checkNotNull(catalog, dbName, tableName);
+
+ List<TagTableMetadata> tags = new ArrayList<>();
+
+ Table table = getTable(catalog, dbName, "`" + tableName + "$" + TAGS +
"`");
+
+ RecordReader<InternalRow> reader = getReader(table);
+ reader.forEachRemaining(
+ row -> {
+ TagTableMetadata tagTableMetadata =
+ TagTableMetadata.builder()
+ .tagName(row.getString(1).toString())
+ .snapshotId(row.getLong(2))
+ .schemaId(row.getLong(3))
+ .createTime(row.getTimestamp(4,
3).toLocalDateTime())
+ .recordCount(row.getLong(5))
+ .build();
+ tags.add(tagTableMetadata);
+ });
+
+ return tags;
+ }
+
+ @VisibleForTesting
+ private static SnapshotManager getSnapshotManager(
+ Catalog catalog, CatalogEntity catalogEntity, String dbName,
String tableName)
+ throws IOException {
+ String warehouse = catalogEntity.getWarehouse();
+
+ FileIO fileIO =
+ FileIO.get(
+ new Path(warehouse),
+ CatalogContext.create(buildOptions(catalog,
catalogEntity)));
+
+ String tablePath = warehouse + "/" + dbName + ".db" + "/" + tableName;
+ return new SnapshotManager(fileIO, new Path(tablePath));
+ }
+
+ private static Options buildOptions(Catalog catalog, CatalogEntity
catalogEntity) {
+ Options options = new Options();
+ if (catalog instanceof FileSystemCatalog) {
+ options.set(CatalogProperties.WAREHOUSE,
catalogEntity.getWarehouse());
+ } else if (catalog instanceof HiveCatalog) {
+ options.set(CatalogProperties.WAREHOUSE,
catalogEntity.getWarehouse());
+ options.set(CatalogProperties.METASTORE,
MetastoreType.HIVE.toString());
+ options.set(CatalogProperties.URI, catalogEntity.getUri());
+ options.set(CatalogProperties.HIVE_CONF_DIR,
catalogEntity.getHiveConfDir());
+ }
+ return options;
+ }
+
+ @VisibleForTesting
+ private static RecordReader<InternalRow> getReader(Table table) {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead tableRead = readBuilder.newRead();
+ try {
+ return tableRead.createReader(splits);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @VisibleForTesting
+ private static WriteBuilder getWriteBuilder(
+ Table table, String writeMode, @Nullable Map<String, String>
staticPartition) {
+ if (writeMode.equals(WriteMode.BATCH.getValue())) {
+ return table.newBatchWriteBuilder().withOverwrite(staticPartition);
+ } else {
+ return table.newStreamWriteBuilder();
+ }
+ }
+
+ public static TableWrite getBatchTableWriter(
+ Table table, @Nullable Map<String, String> staticPartition) {
+ BatchWriteBuilder writeBuilder =
+ (BatchWriteBuilder)
+ getWriteBuilder(table, WriteMode.BATCH.getValue(),
staticPartition);
+ return writeBuilder.newWrite();
+ }
+
+ public static TableWrite getStreamTableWriter(Table table) {
+ StreamWriteBuilder writeBuilder =
+ (StreamWriteBuilder) getWriteBuilder(table,
WriteMode.STREAM.getValue(), null);
+ return writeBuilder.newWrite();
+ }
+
+ public static void batchWrite(
+ List<GenericRow> records,
+ Catalog catalog,
+ String dbName,
+ String tableName,
+ @Nullable Map<String, String> staticPartition)
+ throws Exception {
+ checkNotNull(catalog, dbName, tableName);
+
+ BatchWriteBuilder writeBuilder =
+ (BatchWriteBuilder)
+ getWriteBuilder(
+ getTable(catalog, dbName, tableName),
+ WriteMode.BATCH.getValue(),
+ staticPartition);
+
+ List<CommitMessage> commitMessages;
+ try (BatchTableWrite writer = writeBuilder.newWrite()) {
+
+ for (GenericRow record : records) {
+ writer.write(record);
+ }
+
+ commitMessages = writer.prepareCommit();
+ }
+
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ private static void checkNotNull(Catalog catalog, String dbName, String
tableName) {
+ ParameterValidationUtil.checkNotNull(
+ new SimpleEntry<>(catalog, () -> "Catalog"),
+ new SimpleEntry<>(dbName, () -> "Database name"),
+ new SimpleEntry<>(tableName, () -> "Table name"));
+ }
+
+ private static void validateColumnExistence(
+ Catalog catalog, String dbName, String tableName, String
columnName)
+ throws Catalog.TableNotExistException, IOException {
+ SchemaTableMetadata latestSchema = getLatestSchema(catalog, dbName,
tableName);
+ if (!latestSchema.getFields().contains(columnName)) {
+ throw new RuntimeException("Column not found: " + columnName);
+ }
+ }
+
private static Map<String, String> handleOptions(Map<String, String>
options) {
List<String> keys = TableOptionExtractor.keys();
Map<String, String> filteredOptions = new HashMap<>();
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
new file mode 100644
index 0000000..f26170c
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/table/TagTableMetadata.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.table;
+
+import javax.annotation.Nullable;
+
+import java.time.LocalDateTime;
+
+/** file table metadata. */
+public class TagTableMetadata {
+
+ private final String tagName;
+ private final Long snapshotId;
+ private final Long schemaId;
+ private final LocalDateTime createTime;
+ private final Long recordCount;
+
+ public TagTableMetadata(
+ String tagName,
+ Long snapshotId,
+ Long schemaId,
+ LocalDateTime createTime,
+ Long recordCount) {
+ this.tagName = tagName;
+ this.snapshotId = snapshotId;
+ this.schemaId = schemaId;
+ this.createTime = createTime;
+ this.recordCount = recordCount;
+ }
+
+ public String getTagName() {
+ return tagName;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ public Long getSchemaId() {
+ return schemaId;
+ }
+
+ public LocalDateTime getCreateTime() {
+ return createTime;
+ }
+
+ public Long getRecordCount() {
+ return recordCount;
+ }
+
+ public static TagTableMetadata.Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for TagTableMetadata. */
+ public static final class Builder {
+ private String tagName;
+ private Long snapshotId;
+ private Long schemaId;
+ private LocalDateTime createTime;
+ @Nullable private Long recordCount;
+
+ public Builder tagName(String tagName) {
+ this.tagName = tagName;
+ return this;
+ }
+
+ public Builder snapshotId(Long snapshotId) {
+ this.snapshotId = snapshotId;
+ return this;
+ }
+
+ public Builder schemaId(Long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ public Builder createTime(LocalDateTime createTime) {
+ this.createTime = createTime;
+ return this;
+ }
+
+ public Builder recordCount(Long recordCount) {
+ this.recordCount = recordCount;
+ return this;
+ }
+
+ public TagTableMetadata build() {
+ return new TagTableMetadata(tagName, snapshotId, schemaId,
createTime, recordCount);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
similarity index 51%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
index c8930fd..c453fef 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/annotation/VisibleForTesting.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,16 +16,20 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.common.annotation;
-/** paimon catalog properties. */
-public class CatalogProperties {
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
- public static final String METASTORE_TYPE = "metastore";
-
- public static final String WAREHOUSE_LOCATION = "warehouse";
-
- public static final String URI = "uri";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
-}
+/**
+ * This annotations declares that a function, field, constructor, or entire
type, is only visible
+ * for testing purposes.
+ *
+ * <p>This annotation is typically attached when for example a method should
be {@code private}
+ * (because it is not intended to be called externally), but cannot be
declared private, because
+ * some tests need to have access to it.
+ */
+@Documented
+@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD,
ElementType.CONSTRUCTOR})
+public @interface VisibleForTesting {}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
similarity index 61%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
copy to
paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
index c8930fd..aebad3e 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/common/CatalogProperties.java
+++
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/utils/ParameterValidationUtil.java
@@ -16,16 +16,20 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.common;
+package org.apache.paimon.web.common.utils;
-/** paimon catalog properties. */
-public class CatalogProperties {
+import java.util.Map;
+import java.util.function.Supplier;
- public static final String METASTORE_TYPE = "metastore";
+/** parameter validation util. */
+public class ParameterValidationUtil {
- public static final String WAREHOUSE_LOCATION = "warehouse";
-
- public static final String URI = "uri";
-
- public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ @SafeVarargs
+ public static void checkNotNull(Map.Entry<Object, Supplier<String>>...
entries) {
+ for (Map.Entry<Object, Supplier<String>> entry : entries) {
+ if (entry.getKey() == null) {
+ throw new IllegalArgumentException(entry.getValue().get() + "
can not be null.");
+ }
+ }
+ }
}