This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new de7a50dc91 [core] Support partition API (#4786)
de7a50dc91 is described below
commit de7a50dc9150bddddfcdabc5c6cd4427745273ae
Author: jerry <[email protected]>
AuthorDate: Mon Dec 30 19:16:10 2024 +0800
[core] Support partition API (#4786)
---
.../paimon/utils/InternalRowPartitionComputer.java | 2 +-
.../java/org/apache/paimon/rest/HttpClient.java | 17 ++
.../java/org/apache/paimon/rest/RESTCatalog.java | 148 +++++++++++--
.../java/org/apache/paimon/rest/RESTClient.java | 2 +
.../java/org/apache/paimon/rest/ResourcePaths.java | 11 +
.../rest/requests/CreatePartitionRequest.java | 61 +++++
.../paimon/rest/requests/DropPartitionRequest.java | 49 +++++
.../rest/responses/ListPartitionsResponse.java | 49 +++++
.../paimon/rest/responses/PartitionResponse.java | 93 ++++++++
.../org/apache/paimon/rest/MockRESTMessage.java | 43 +++-
.../org/apache/paimon/rest/RESTCatalogTest.java | 213 +++++++++++++-----
.../apache/paimon/rest/RESTObjectMapperTest.java | 45 ++++
paimon-open-api/rest-catalog-open-api.yaml | 245 +++++++++++++++++----
.../paimon/open/api/RESTCatalogController.java | 80 +++++++
14 files changed, 931 insertions(+), 127 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 6bb26d7613..f4aad8f03f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -111,7 +111,7 @@ public class InternalRowPartitionComputer {
List<String> fieldNames = partType.getFieldNames();
for (Map.Entry<String, String> entry : spec.entrySet()) {
Object value =
- defaultPartValue.equals(entry.getValue())
+ defaultPartValue != null &&
defaultPartValue.equals(entry.getValue())
? null
: castFromString(
entry.getValue(),
partType.getField(entry.getKey()).type());
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 87f3fad9b2..d92cab5102 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -94,6 +94,23 @@ public class HttpClient implements RESTClient {
return exec(request, null);
}
+ @Override
+ public <T extends RESTResponse> T delete(
+ String path, RESTRequest body, Map<String, String> headers) {
+ try {
+ RequestBody requestBody = buildRequestBody(body);
+ Request request =
+ new Request.Builder()
+ .url(uri + path)
+ .delete(requestBody)
+ .headers(Headers.of(headers))
+ .build();
+ return exec(request, null);
+ } catch (JsonProcessingException e) {
+ throw new RESTException(e, "build request failed.");
+ }
+ }
+
@Override
public void close() throws IOException {
okHttpClient.dispatcher().cancelAll();
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e0b3dccdd5..c430e303b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -26,9 +26,12 @@ import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -41,7 +44,9 @@ import
org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
@@ -49,7 +54,9 @@ import
org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
@@ -58,10 +65,11 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
-import
org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -71,15 +79,20 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
@@ -132,7 +145,8 @@ public class RESTCatalog implements Catalog {
Map<String, String> initHeaders =
RESTUtil.merge(
configHeaders(catalogOptions.toMap()),
this.catalogAuth.getHeaders());
- Options options = new Options(fetchOptionsFromServer(initHeaders,
initHeaders));
+ Options options =
+ new Options(fetchOptionsFromServer(initHeaders,
catalogContext.options().toMap()));
this.context =
CatalogContext.create(
options, catalogContext.preferIO(),
catalogContext.fallbackIO());
@@ -141,20 +155,6 @@ public class RESTCatalog implements Catalog {
this.fileIO = getFileIOFromOptions(context);
}
- private static FileIO getFileIOFromOptions(CatalogContext context) {
- try {
- Options options = context.options();
- String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
- Path warehousePath = new Path(warehouseStr);
- CatalogContext contextWithNewOptions =
- CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
- return FileIO.get(warehousePath, contextWithNewOptions);
- } catch (IOException e) {
- LOG.warn("Can not get FileIO from options.");
- throw new RuntimeException(e);
- }
- }
-
@Override
public String warehouse() {
return context.options().get(CatalogOptions.WAREHOUSE);
@@ -360,17 +360,43 @@ public class RESTCatalog implements Catalog {
@Override
public void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
throws TableNotExistException {
- throw new UnsupportedOperationException();
+ try {
+ CreatePartitionRequest request = new
CreatePartitionRequest(identifier, partitionSpec);
+ client.post(
+ resourcePaths.partitions(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ request,
+ PartitionResponse.class,
+ headers());
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
}
@Override
public void dropPartition(Identifier identifier, Map<String, String>
partitions)
- throws TableNotExistException, PartitionNotExistException {}
+ throws TableNotExistException, PartitionNotExistException {
+ checkNotSystemTable(identifier, "dropPartition");
+ dropPartitionMetadata(identifier, partitions);
+ Table table = getTable(identifier);
+ cleanPartitionsInFileSystem(table, partitions);
+ }
@Override
public List<PartitionEntry> listPartitions(Identifier identifier)
throws TableNotExistException {
- throw new UnsupportedOperationException();
+ FileStoreTable table = (FileStoreTable) getTable(identifier);
+ boolean whetherSupportListPartitions =
+ Boolean.parseBoolean(
+
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
+ if (whetherSupportListPartitions) {
+ RowType rowType = table.schema().logicalPartitionType();
+ return listPartitionsFromServer(identifier, rowType);
+ } else {
+ return
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+ }
}
@Override
@@ -388,16 +414,14 @@ public class RESTCatalog implements Catalog {
}
}
- @VisibleForTesting
- Map<String, String> fetchOptionsFromServer(
+ protected Map<String, String> fetchOptionsFromServer(
Map<String, String> headers, Map<String, String> clientProperties)
{
ConfigResponse response =
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class,
headers);
return response.merge(clientProperties);
}
- @VisibleForTesting
- Table getDataOrFormatTable(Identifier identifier) throws
TableNotExistException {
+ private Table getDataOrFormatTable(Identifier identifier) throws
TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
GetTableResponse response = getTableResponse(identifier);
FileStoreTable table =
@@ -420,8 +444,42 @@ public class RESTCatalog implements Catalog {
return table;
}
- protected GetTableResponse getTableResponse(Identifier identifier)
+ private List<PartitionEntry> listPartitionsFromServer(Identifier
identifier, RowType rowType)
throws TableNotExistException {
+ try {
+ ListPartitionsResponse response =
+ client.get(
+ resourcePaths.partitions(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ ListPartitionsResponse.class,
+ headers());
+ if (response != null && response.getPartitions() != null) {
+ return response.getPartitions().stream()
+ .map(p -> convertToPartitionEntry(p, rowType))
+ .collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
+ private void cleanPartitionsInFileSystem(Table table, Map<String, String>
partitions) {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ try (FileStoreCommit commit =
+ fileStoreTable
+ .store()
+ .newCommit(
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+ commit.dropPartitions(
+ Collections.singletonList(partitions),
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ }
+ }
+
+ private GetTableResponse getTableResponse(Identifier identifier) throws
TableNotExistException {
try {
return client.get(
resourcePaths.table(identifier.getDatabaseName(),
identifier.getTableName()),
@@ -434,6 +492,23 @@ public class RESTCatalog implements Catalog {
}
}
+ private boolean dropPartitionMetadata(Identifier identifier, Map<String,
String> partitions)
+ throws TableNoPermissionException, PartitionNotExistException {
+ try {
+ DropPartitionRequest request = new
DropPartitionRequest(partitions);
+ client.delete(
+ resourcePaths.partitions(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ request,
+ headers());
+ return true;
+ } catch (NoSuchResourceException ignore) {
+ throw new PartitionNotExistException(identifier, partitions);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
private static Map<String, String> configHeaders(Map<String, String>
properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
@@ -464,4 +539,29 @@ public class RESTCatalog implements Catalog {
return refreshExecutor;
}
+
+ private PartitionEntry convertToPartitionEntry(PartitionResponse
partition, RowType rowType) {
+ InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+ GenericRow row = convertSpecToInternalRow(partition.getSpec(),
rowType, null);
+ return new PartitionEntry(
+ serializer.toBinaryRow(row).copy(),
+ partition.getRecordCount(),
+ partition.getFileSizeInBytes(),
+ partition.getFileCount(),
+ partition.getLastFileCreationTime());
+ }
+
+ private static FileIO getFileIOFromOptions(CatalogContext context) {
+ try {
+ Options options = context.options();
+ String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
+ Path warehousePath = new Path(warehouseStr);
+ CatalogContext contextWithNewOptions =
+ CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
+ return FileIO.get(warehousePath, contextWithNewOptions);
+ } catch (IOException e) {
+ LOG.warn("Can not get FileIO from options.");
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index a255d688bc..d90cb5fa4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -30,4 +30,6 @@ public interface RESTClient extends Closeable {
String path, RESTRequest body, Class<T> responseType, Map<String,
String> headers);
<T extends RESTResponse> T delete(String path, Map<String, String>
headers);
+
+ <T extends RESTResponse> T delete(String path, RESTRequest body,
Map<String, String> headers);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index f006713fe2..ebfdd2db1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -82,4 +82,15 @@ public class ResourcePaths {
.add("rename")
.toString();
}
+
+ public String partitions(String databaseName, String tableName) {
+ return SLASH.add("v1")
+ .add(prefix)
+ .add("databases")
+ .add(databaseName)
+ .add("tables")
+ .add(tableName)
+ .add("partitions")
+ .toString();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
new file mode 100644
index 0000000000..e8094ab821
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.rest.RESTRequest;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Request for creating partition. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CreatePartitionRequest implements RESTRequest {
+
+ private static final String FIELD_IDENTIFIER = "identifier";
+ private static final String FIELD_PARTITION_SPEC = "spec";
+
+ @JsonProperty(FIELD_IDENTIFIER)
+ private final Identifier identifier;
+
+ @JsonProperty(FIELD_PARTITION_SPEC)
+ private final Map<String, String> partitionSpec;
+
+ @JsonCreator
+ public CreatePartitionRequest(
+ @JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
+ @JsonProperty(FIELD_PARTITION_SPEC) Map<String, String>
partitionSpec) {
+ this.identifier = identifier;
+ this.partitionSpec = partitionSpec;
+ }
+
+ @JsonGetter(FIELD_IDENTIFIER)
+ public Identifier getIdentifier() {
+ return identifier;
+ }
+
+ @JsonGetter(FIELD_PARTITION_SPEC)
+ public Map<String, String> getPartitionSpec() {
+ return partitionSpec;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
new file mode 100644
index 0000000000..4fabf11636
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Request for deleting partition. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DropPartitionRequest implements RESTRequest {
+
+ private static final String FIELD_PARTITION_SPEC = "spec";
+
+ @JsonProperty(FIELD_PARTITION_SPEC)
+ private final Map<String, String> partitionSpec;
+
+ @JsonCreator
+ public DropPartitionRequest(
+ @JsonProperty(FIELD_PARTITION_SPEC) Map<String, String>
partitionSpec) {
+ this.partitionSpec = partitionSpec;
+ }
+
+ @JsonGetter(FIELD_PARTITION_SPEC)
+ public Map<String, String> getPartitionSpec() {
+ return partitionSpec;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
new file mode 100644
index 0000000000..1f194d208e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for listing partitions. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListPartitionsResponse implements RESTResponse {
+
+ public static final String FIELD_PARTITIONS = "partitions";
+
+ @JsonProperty(FIELD_PARTITIONS)
+ private final List<PartitionResponse> partitions;
+
+ @JsonCreator
+ public ListPartitionsResponse(
+ @JsonProperty(FIELD_PARTITIONS) List<PartitionResponse>
partitions) {
+ this.partitions = partitions;
+ }
+
+ @JsonGetter(FIELD_PARTITIONS)
+ public List<PartitionResponse> getPartitions() {
+ return partitions;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
new file mode 100644
index 0000000000..2706b5d7da
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Partition for rest api. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionResponse implements RESTResponse {
+
+ public static final String FIELD_SPEC = "spec";
+ public static final String FIELD_RECORD_COUNT = "recordCount";
+ public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
+ public static final String FIELD_FILE_COUNT = "fileCount";
+ public static final String FIELD_LAST_FILE_CREATION_TIME =
"lastFileCreationTime";
+
+ @JsonProperty(FIELD_SPEC)
+ private final Map<String, String> spec;
+
+ @JsonProperty(FIELD_RECORD_COUNT)
+ private final long recordCount;
+
+ @JsonProperty(FIELD_FILE_SIZE_IN_BYTES)
+ private final long fileSizeInBytes;
+
+ @JsonProperty(FIELD_FILE_COUNT)
+ private final long fileCount;
+
+ @JsonProperty(FIELD_LAST_FILE_CREATION_TIME)
+ private final long lastFileCreationTime;
+
+ @JsonCreator
+ public PartitionResponse(
+ @JsonProperty(FIELD_SPEC) Map<String, String> spec,
+ @JsonProperty(FIELD_RECORD_COUNT) long recordCount,
+ @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
+ @JsonProperty(FIELD_FILE_COUNT) long fileCount,
+ @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long
lastFileCreationTime) {
+ this.spec = spec;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ this.fileCount = fileCount;
+ this.lastFileCreationTime = lastFileCreationTime;
+ }
+
+ @JsonGetter(FIELD_SPEC)
+ public Map<String, String> getSpec() {
+ return spec;
+ }
+
+ @JsonGetter(FIELD_RECORD_COUNT)
+ public long getRecordCount() {
+ return recordCount;
+ }
+
+ @JsonGetter(FIELD_FILE_SIZE_IN_BYTES)
+ public long getFileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ @JsonGetter(FIELD_FILE_COUNT)
+ public long getFileCount() {
+ return fileCount;
+ }
+
+ @JsonGetter(FIELD_LAST_FILE_CREATION_TIME)
+ public long getLastFileCreationTime() {
+ return lastFileCreationTime;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 96d0c9d7c7..9b686b6837 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -18,11 +18,14 @@
package org.apache.paimon.rest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
@@ -30,7 +33,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.DataField;
@@ -39,6 +44,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import java.util.ArrayList;
@@ -131,6 +137,26 @@ public class MockRESTMessage {
return new AlterTableRequest(getChanges());
}
+ public static CreatePartitionRequest createPartitionRequest(String
tableName) {
+ Identifier identifier = Identifier.create(databaseName(), tableName);
+ return new CreatePartitionRequest(identifier,
Collections.singletonMap("pt", "1"));
+ }
+
+ public static DropPartitionRequest dropPartitionRequest() {
+ return new DropPartitionRequest(Collections.singletonMap("pt", "1"));
+ }
+
+ public static PartitionResponse partitionResponse() {
+ Map<String, String> spec = new HashMap<>();
+ spec.put("f0", "1");
+ return new PartitionResponse(spec, 1, 1, 1, 1);
+ }
+
+ public static ListPartitionsResponse listPartitionsResponse() {
+ PartitionResponse partition = partitionResponse();
+ return new ListPartitionsResponse(ImmutableList.of(partition));
+ }
+
public static List<SchemaChange> getChanges() {
// add option
SchemaChange addOption =
SchemaChange.setOption("snapshot.time-retained", "2h");
@@ -202,20 +228,27 @@ public class MockRESTMessage {
return schemaChanges;
}
+ public static GetTableResponse getTableResponseEnablePartition() {
+ Map<String, String> options = new HashMap<>();
+ options.put("option-1", "value-1");
+ options.put(CoreOptions.METASTORE_PARTITIONED_TABLE.key(), "true");
+ return new GetTableResponse("/tmp/2", 1, schema(options));
+ }
+
public static GetTableResponse getTableResponse() {
- return new GetTableResponse("/tmp/1", 1, schema());
+ Map<String, String> options = new HashMap<>();
+ options.put("option-1", "value-1");
+ options.put("option-2", "value-2");
+ return new GetTableResponse("/tmp/1", 1, schema(options));
}
- private static Schema schema() {
+ private static Schema schema(Map<String, String> options) {
List<DataField> fields =
Arrays.asList(
new DataField(0, "f0", new IntType()),
new DataField(1, "f1", new IntType()));
List<String> partitionKeys = Collections.singletonList("f0");
List<String> primaryKeys = Arrays.asList("f0", "f1");
- Map<String, String> options = new HashMap<>();
- options.put("option-1", "value-1");
- options.put("option-2", "value-2");
return new Schema(fields, partitionKeys, primaryKeys, options,
"comment");
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 01555adc3d..67103aaa52 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.requests.CreateTableRequest;
@@ -31,7 +32,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
@@ -55,13 +58,6 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
/** Test for REST Catalog. */
public class RESTCatalogTest {
@@ -69,31 +65,19 @@ public class RESTCatalogTest {
private final ObjectMapper mapper = RESTObjectMapper.create();
private MockWebServer mockWebServer;
private RESTCatalog restCatalog;
- private RESTCatalog mockRestCatalog;
private String warehouseStr;
+ private String serverUrl;
@Rule public TemporaryFolder folder = new TemporaryFolder();
@Before
public void setUp() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start();
- String baseUrl = mockWebServer.url("").toString();
- Options options = new Options();
- options.set(RESTCatalogOptions.URI, baseUrl);
- String initToken = "init_token";
- options.set(RESTCatalogOptions.TOKEN, initToken);
- options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+ serverUrl = mockWebServer.url("").toString();
+ Options options = mockInitOptions();
warehouseStr = folder.getRoot().getPath();
- String mockResponse =
- String.format(
- "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
- RESTCatalogInternalOptions.PREFIX.key(),
- "prefix",
- CatalogOptions.WAREHOUSE.key(),
- warehouseStr);
- mockResponse(mockResponse, 200);
+ mockConfig(warehouseStr);
restCatalog = new RESTCatalog(CatalogContext.create(options));
- mockRestCatalog = spy(restCatalog);
}
@After
@@ -154,9 +138,7 @@ public class RESTCatalogTest {
public void testDropDatabase() throws Exception {
String name = MockRESTMessage.databaseName();
mockResponse("", 200);
- assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false,
true));
- verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false),
eq(true));
- verify(mockRestCatalog, times(0)).listTables(eq(name));
+ assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false, true));
}
@Test
@@ -166,7 +148,7 @@ public class RESTCatalogTest {
mockResponse(mapper.writeValueAsString(response), 404);
assertThrows(
Catalog.DatabaseNotExistException.class,
- () -> mockRestCatalog.dropDatabase(name, false, true));
+ () -> restCatalog.dropDatabase(name, false, true));
}
@Test
@@ -174,9 +156,7 @@ public class RESTCatalogTest {
String name = MockRESTMessage.databaseName();
ErrorResponse response =
MockRESTMessage.noSuchResourceExceptionErrorResponse();
mockResponse(mapper.writeValueAsString(response), 404);
- assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, true,
true));
- verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(true),
eq(true));
- verify(mockRestCatalog, times(0)).listTables(eq(name));
+ assertDoesNotThrow(() -> restCatalog.dropDatabase(name, true, true));
}
@Test
@@ -186,9 +166,7 @@ public class RESTCatalogTest {
ListTablesResponse response =
MockRESTMessage.listTablesEmptyResponse();
mockResponse(mapper.writeValueAsString(response), 200);
mockResponse("", 200);
- assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false,
cascade));
- verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false),
eq(cascade));
- verify(mockRestCatalog, times(1)).listTables(eq(name));
+ assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false,
cascade));
}
@Test
@@ -199,9 +177,7 @@ public class RESTCatalogTest {
mockResponse(mapper.writeValueAsString(response), 200);
assertThrows(
Catalog.DatabaseNotEmptyException.class,
- () -> mockRestCatalog.dropDatabase(name, false, cascade));
- verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false),
eq(cascade));
- verify(mockRestCatalog, times(1)).listTables(eq(name));
+ () -> restCatalog.dropDatabase(name, false, cascade));
}
@Test
@@ -209,7 +185,7 @@ public class RESTCatalogTest {
String name = MockRESTMessage.databaseName();
AlterDatabaseResponse response =
MockRESTMessage.alterDatabaseResponse();
mockResponse(mapper.writeValueAsString(response), 200);
- assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new
ArrayList<>(), true));
+ assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new
ArrayList<>(), true));
}
@Test
@@ -220,7 +196,7 @@ public class RESTCatalogTest {
mockResponse(mapper.writeValueAsString(response), 404);
assertThrows(
Catalog.DatabaseNotExistException.class,
- () -> mockRestCatalog.alterDatabase(name, new ArrayList<>(),
false));
+ () -> restCatalog.alterDatabase(name, new ArrayList<>(),
false));
}
@Test
@@ -228,7 +204,7 @@ public class RESTCatalogTest {
String name = MockRESTMessage.databaseName();
ErrorResponse response =
MockRESTMessage.noSuchResourceExceptionErrorResponse();
mockResponse(mapper.writeValueAsString(response), 404);
- assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new
ArrayList<>(), true));
+ assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new
ArrayList<>(), true));
}
@Test
@@ -245,10 +221,8 @@ public class RESTCatalogTest {
String databaseName = MockRESTMessage.databaseName();
GetTableResponse response = MockRESTMessage.getTableResponse();
mockResponse(mapper.writeValueAsString(response), 200);
- Table result =
mockRestCatalog.getTable(Identifier.create(databaseName, "table"));
- // catalog will add path option
+ Table result = restCatalog.getTable(Identifier.create(databaseName,
"table"));
assertEquals(response.getSchema().options().size() + 1,
result.options().size());
- verify(mockRestCatalog, times(1)).getDataOrFormatTable(any());
}
@Test
@@ -278,11 +252,10 @@ public class RESTCatalogTest {
mockResponse(mapper.writeValueAsString(response), 200);
assertDoesNotThrow(
() ->
- mockRestCatalog.renameTable(
+ restCatalog.renameTable(
Identifier.create(databaseName, fromTableName),
Identifier.create(databaseName, toTableName),
true));
- verify(mockRestCatalog, times(1)).renameTable(any(), any(),
anyBoolean());
}
@Test
@@ -294,7 +267,7 @@ public class RESTCatalogTest {
assertThrows(
Catalog.TableNotExistException.class,
() ->
- mockRestCatalog.renameTable(
+ restCatalog.renameTable(
Identifier.create(databaseName, fromTableName),
Identifier.create(databaseName, toTableName),
false));
@@ -309,7 +282,7 @@ public class RESTCatalogTest {
assertThrows(
Catalog.TableAlreadyExistException.class,
() ->
- mockRestCatalog.renameTable(
+ restCatalog.renameTable(
Identifier.create(databaseName, fromTableName),
Identifier.create(databaseName, toTableName),
false));
@@ -322,10 +295,7 @@ public class RESTCatalogTest {
GetTableResponse response = MockRESTMessage.getTableResponse();
mockResponse(mapper.writeValueAsString(response), 200);
assertDoesNotThrow(
- () ->
- mockRestCatalog.alterTable(
- Identifier.create(databaseName, "t1"),
changes, true));
- verify(mockRestCatalog, times(1)).alterTable(any(), anyList(),
anyBoolean());
+ () -> restCatalog.alterTable(Identifier.create(databaseName,
"t1"), changes, true));
}
@Test
@@ -336,7 +306,7 @@ public class RESTCatalogTest {
assertThrows(
Catalog.TableNotExistException.class,
() ->
- mockRestCatalog.alterTable(
+ restCatalog.alterTable(
Identifier.create(databaseName, "t1"),
changes, false));
}
@@ -359,6 +329,127 @@ public class RESTCatalogTest {
() -> restCatalog.dropTable(Identifier.create(databaseName,
tableName), false));
}
+ @Test
+ public void testCreatePartition() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ partitionSpec.put("p1", "v1");
+ PartitionResponse response = MockRESTMessage.partitionResponse();
+ mockResponse(mapper.writeValueAsString(response), 200);
+ assertDoesNotThrow(
+ () ->
+ restCatalog.createPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testCreatePartitionWhenTableNotExist() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ partitionSpec.put("p1", "v1");
+ mockResponse("", 404);
+ assertThrows(
+ Catalog.TableNotExistException.class,
+ () ->
+ restCatalog.createPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testCreatePartitionWhenTableNoPermissionException() throws
Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ partitionSpec.put("p1", "v1");
+ mockResponse("", 403);
+ assertThrows(
+ Catalog.TableNoPermissionException.class,
+ () ->
+ restCatalog.createPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testDropPartition() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+ mockResponse(mapper.writeValueAsString(""), 200);
+ mockResponse(mapper.writeValueAsString(response), 200);
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ restCatalog.dropPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testDropPartitionWhenPartitionNoExist() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+ mockResponse(mapper.writeValueAsString(""), 404);
+ mockResponse(mapper.writeValueAsString(response), 200);
+ assertThrows(
+ Catalog.PartitionNotExistException.class,
+ () ->
+ restCatalog.dropPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testDropPartitionWhenTableNoPermission() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+ mockResponse(mapper.writeValueAsString(""), 403);
+ assertThrows(
+ Catalog.TableNoPermissionException.class,
+ () ->
+ restCatalog.dropPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testDropPartitionWhenTableNoExist() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ Map<String, String> partitionSpec = new HashMap<>();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+ mockResponse(mapper.writeValueAsString(""), 200);
+ mockResponse("", 404);
+ assertThrows(
+ Catalog.TableNotExistException.class,
+ () ->
+ restCatalog.dropPartition(
+ Identifier.create(databaseName, "table"),
partitionSpec));
+ }
+
+ @Test
+ public void testListPartitionsWhenMetastorePartitionedIsTrue() throws
Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ GetTableResponse getTableResponse =
MockRESTMessage.getTableResponseEnablePartition();
+ mockResponse(mapper.writeValueAsString(getTableResponse), 200);
+ ListPartitionsResponse response =
MockRESTMessage.listPartitionsResponse();
+ mockResponse(mapper.writeValueAsString(response), 200);
+ List<PartitionEntry> result =
+ restCatalog.listPartitions(Identifier.create(databaseName,
"table"));
+ assertEquals(response.getPartitions().size(), result.size());
+ }
+
+ @Test
+ public void testListPartitionsFromFile() throws Exception {
+ String databaseName = MockRESTMessage.databaseName();
+ GetTableResponse response = MockRESTMessage.getTableResponse();
+ mockResponse(mapper.writeValueAsString(response), 200);
+ mockResponse(mapper.writeValueAsString(response), 200);
+ List<PartitionEntry> partitionEntries =
+ restCatalog.listPartitions(Identifier.create(databaseName,
"table"));
+ assertEquals(partitionEntries.size(), 0);
+ }
+
private void mockResponse(String mockResponse, int httpCode) {
MockResponse mockResponseObj =
new MockResponse()
@@ -367,4 +458,24 @@ public class RESTCatalogTest {
.addHeader("Content-Type", "application/json");
mockWebServer.enqueue(mockResponseObj);
}
+
+ private void mockConfig(String warehouseStr) {
+ String mockResponse =
+ String.format(
+ "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+ RESTCatalogInternalOptions.PREFIX.key(),
+ "prefix",
+ CatalogOptions.WAREHOUSE.key(),
+ warehouseStr);
+ mockResponse(mockResponse, 200);
+ }
+
+ public Options mockInitOptions() {
+ Options options = new Options();
+ options.set(RESTCatalogOptions.URI, serverUrl);
+ String initToken = "init_token";
+ options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+ return options;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 26b3db615d..6712b7b991 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -21,7 +21,9 @@ package org.apache.paimon.rest;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
@@ -30,11 +32,14 @@ import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
@@ -44,6 +49,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
/** Test for {@link RESTObjectMapper}. */
public class RESTObjectMapperTest {
@@ -193,4 +199,43 @@ public class RESTObjectMapperTest {
AlterTableRequest parseData = mapper.readValue(requestStr,
AlterTableRequest.class);
assertEquals(parseData.getChanges().size(),
parseData.getChanges().size());
}
+
+ @Test
+ public void createPartitionRequestParseTest() throws
JsonProcessingException {
+ CreatePartitionRequest request =
MockRESTMessage.createPartitionRequest("t1");
+ String requestStr = mapper.writeValueAsString(request);
+ CreatePartitionRequest parseData =
+ mapper.readValue(requestStr, CreatePartitionRequest.class);
+ assertEquals(parseData.getIdentifier(), parseData.getIdentifier());
+ assertEquals(parseData.getPartitionSpec().size(),
parseData.getPartitionSpec().size());
+ }
+
+ @Test
+ public void dropPartitionRequestParseTest() throws JsonProcessingException
{
+ DropPartitionRequest request = MockRESTMessage.dropPartitionRequest();
+ String requestStr = mapper.writeValueAsString(request);
+ DropPartitionRequest parseData = mapper.readValue(requestStr,
DropPartitionRequest.class);
+ assertEquals(parseData.getPartitionSpec().size(),
parseData.getPartitionSpec().size());
+ }
+
+ @Test
+ public void listPartitionsResponseParseTest() throws Exception {
+ ListPartitionsResponse response =
MockRESTMessage.listPartitionsResponse();
+ String responseStr = mapper.writeValueAsString(response);
+ ListPartitionsResponse parseData =
+ mapper.readValue(responseStr, ListPartitionsResponse.class);
+ assertEquals(
+ response.getPartitions().get(0).getFileCount(),
+ parseData.getPartitions().get(0).getFileCount());
+ }
+
+ @Test
+ public void partitionResponseParseTest() throws Exception {
+ PartitionResponse response = MockRESTMessage.partitionResponse();
+ assertDoesNotThrow(() -> mapper.writeValueAsString(response));
+ assertDoesNotThrow(
+ () ->
+ mapper.readValue(
+ mapper.writeValueAsString(response),
PartitionResponse.class));
+ }
}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index 7fefd0254b..7a0c9663b4 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -28,6 +28,21 @@ servers:
- url: http://localhost:8080
description: Server URL in Development environment
paths:
+ /v1/config:
+ get:
+ tags:
+ - config
+ summary: Get Config
+ operationId: getConfig
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ConfigResponse'
+ "500":
+ description: Internal Server Error
/v1/{prefix}/databases:
get:
tags:
@@ -80,6 +95,102 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
+ /v1/{prefix}/databases/{database}:
+ get:
+ tags:
+ - database
+ summary: Get Database
+ operationId: getDatabases
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetDatabaseResponse'
+ "404":
+ description: Resource not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "500":
+ description: Internal Server Error
+ delete:
+ tags:
+ - database
+ summary: Drop Database
+ operationId: dropDatabase
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ "200":
+ description: Success, no content
+ "404":
+ description: Resource not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "500":
+ description: Internal Server Error
+ /v1/{prefix}/databases/{database}/properties:
+ post:
+ tags:
+ - database
+ summary: Alter Database
+ operationId: alterDatabase
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AlterDatabaseRequest'
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AlterDatabaseResponse'
+ "404":
+ description: Resource not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
+ "500":
+ description: Internal Server Error
/v1/{prefix}/databases/{database}/tables:
get:
tags:
@@ -237,6 +348,8 @@ paths:
schema:
type: string
responses:
+ "200":
+ description: Success, no content
"404":
description: Resource not found
content:
@@ -293,12 +406,12 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
- /v1/{prefix}/databases/{database}/properties:
- post:
+ /v1/{prefix}/databases/{database}/tables/{table}/partitions:
+ get:
tags:
- - database
- summary: Alter Database
- operationId: alterDatabase
+ - partition
+ summary: List partitions
+ operationId: listPartitions
parameters:
- name: prefix
in: path
@@ -310,18 +423,18 @@ paths:
required: true
schema:
type: string
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/AlterDatabaseRequest'
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
responses:
"200":
description: OK
content:
application/json:
schema:
- $ref: '#/components/schemas/AlterDatabaseResponse'
+ $ref: '#/components/schemas/ListPartitionsResponse'
"404":
description: Resource not found
content:
@@ -330,12 +443,11 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
- /v1/{prefix}/databases/{database}:
- get:
+ post:
tags:
- - database
- summary: Get Database
- operationId: getDatabases
+ - partition
+ summary: Create partition
+ operationId: createPartition
parameters:
- name: prefix
in: path
@@ -347,13 +459,23 @@ paths:
required: true
schema:
type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/CreatePartitionRequest'
responses:
"200":
description: OK
content:
application/json:
schema:
- $ref: '#/components/schemas/GetDatabaseResponse'
+ $ref: '#/components/schemas/PartitionResponse'
"404":
description: Resource not found
content:
@@ -364,9 +486,9 @@ paths:
description: Internal Server Error
delete:
tags:
- - database
- summary: Drop Database
- operationId: dropDatabase
+ - partition
+ summary: Drop partition
+ operationId: dropPartition
parameters:
- name: prefix
in: path
@@ -378,7 +500,19 @@ paths:
required: true
schema:
type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/DropPartitionRequest'
responses:
+ "200":
+ description: Success, no content
"404":
description: Resource not found
content:
@@ -387,21 +521,6 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
- /v1/config:
- get:
- tags:
- - config
- summary: Get Config
- operationId: getConfig
- responses:
- "200":
- description: OK
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/ConfigResponse'
- "500":
- description: Internal Server Error
components:
schemas:
CreateDatabaseRequest:
@@ -413,6 +532,18 @@ components:
type: object
additionalProperties:
type: string
+ CreatePartitionRequest:
+ type: object
+ properties:
+ identifier:
+ $ref: '#/components/schemas/Identifier'
+ spec:
+ type: object
+ DropPartitionRequest:
+ type: object
+ properties:
+ spec:
+ type: object
CreateDatabaseResponse:
type: object
properties:
@@ -468,39 +599,38 @@ components:
type:
type: string
pattern: ^ARRAY.*
+ example: ARRAY
element:
- type:
- $ref: '#/components/schemas/DataType'
+ $ref: '#/components/schemas/DataType'
MultisetType:
type: object
properties:
type:
type: string
pattern: ^MULTISET.*
+ example: MULTISET
element:
- type:
- $ref: '#/components/schemas/DataType'
+ $ref: '#/components/schemas/DataType'
MapType:
type: object
properties:
type:
type: string
pattern: ^MAP.*
+ example: MAP
key:
- type:
- $ref: '#/components/schemas/DataType'
+ $ref: '#/components/schemas/DataType'
value:
- type:
- $ref: '#/components/schemas/DataType'
+ $ref: '#/components/schemas/DataType'
RowType:
type: object
properties:
type:
type: string
pattern: ^ROW.*
+ example: ROW
fields:
- type:
- $ref: '#/components/schemas/DataField'
+ $ref: '#/components/schemas/DataField'
Identifier:
type: object
properties:
@@ -744,7 +874,30 @@ components:
type: object
additionalProperties:
type: string
-
+ ListPartitionsResponse:
+ type: object
+ properties:
+ partitions:
+ type: array
+ items:
+ $ref: '#/components/schemas/PartitionResponse'
+ PartitionResponse:
+ type: object
+ properties:
+ spec:
+ type: object
+ recordCount:
+ type: integer
+ format: int64
+ fileSizeInBytes:
+ type: integer
+ format: int64
+ fileCount:
+ type: integer
+ format: int64
+ lastFileCreationTime:
+ type: integer
+ format: int64
securitySchemes:
BearerAuth:
type: http
diff --git
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 179c23ce46..62f99876a3 100644
---
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -22,7 +22,9 @@ import org.apache.paimon.rest.ResourcePaths;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
@@ -31,7 +33,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -143,6 +147,7 @@ public class RESTCatalogController {
summary = "Drop Database",
tags = {"database"})
@ApiResponses({
+ @ApiResponse(responseCode = "200", description = "Success, no
content"),
@ApiResponse(
responseCode = "404",
description = "Resource not found",
@@ -301,6 +306,7 @@ public class RESTCatalogController {
summary = "Drop table",
tags = {"table"})
@ApiResponses({
+ @ApiResponse(responseCode = "200", description = "Success, no
content"),
@ApiResponse(
responseCode = "404",
description = "Resource not found",
@@ -346,4 +352,78 @@ public class RESTCatalogController {
new HashMap<>(),
"comment"));
}
+
+ @Operation(
+ summary = "List partitions",
+ tags = {"partition"})
+ @ApiResponses({
+ @ApiResponse(
+ responseCode = "200",
+ content = {
+ @Content(schema = @Schema(implementation =
ListPartitionsResponse.class))
+ }),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Resource not found",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "500",
+ content = {@Content(schema = @Schema())})
+ })
+ @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+ public ListPartitionsResponse listPartitions(
+ @PathVariable String prefix,
+ @PathVariable String database,
+ @PathVariable String table) {
+ Map<String, String> spec = new HashMap<>();
+ spec.put("f1", "1");
+ PartitionResponse partition = new PartitionResponse(spec, 1, 2, 3, 4);
+ return new ListPartitionsResponse(ImmutableList.of(partition));
+ }
+
+ @Operation(
+ summary = "Create partition",
+ tags = {"partition"})
+ @ApiResponses({
+ @ApiResponse(
+ responseCode = "200",
+ content = {@Content(schema = @Schema(implementation =
PartitionResponse.class))}),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Resource not found",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "500",
+ content = {@Content(schema = @Schema())})
+ })
+ @PostMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+ public PartitionResponse createPartition(
+ @PathVariable String prefix,
+ @PathVariable String database,
+ @PathVariable String table,
+ @RequestBody CreatePartitionRequest request) {
+ Map<String, String> spec = new HashMap<>();
+ spec.put("f1", "1");
+ return new PartitionResponse(spec, 0, 0, 0, 4);
+ }
+
+ @Operation(
+ summary = "Drop partition",
+ tags = {"partition"})
+ @ApiResponses({
+ @ApiResponse(responseCode = "200", description = "Success, no
content"),
+ @ApiResponse(
+ responseCode = "404",
+ description = "Resource not found",
+ content = {@Content(schema = @Schema(implementation =
ErrorResponse.class))}),
+ @ApiResponse(
+ responseCode = "500",
+ content = {@Content(schema = @Schema())})
+ })
+
@DeleteMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+ public void dropPartition(
+ @PathVariable String prefix,
+ @PathVariable String database,
+ @PathVariable String table,
+ @RequestBody DropPartitionRequest request) {}
}