This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ce0f6bf770 [core] RESTCatalog refactor RESTCatalogServer and add test
(#5143)
ce0f6bf770 is described below
commit ce0f6bf770a31e8757a42e4e75e325a85e175d19
Author: jerry <[email protected]>
AuthorDate: Fri Feb 28 14:13:48 2025 +0800
[core] RESTCatalog refactor RESTCatalogServer and add test (#5143)
---
docs/content/spark/sql-ddl.md | 44 ++
.../org/apache/paimon/rest/DataTokenProvider.java | 61 ++
.../rest/MetadataInMemoryFileSystemCatalog.java | 443 -------------
.../org/apache/paimon/rest/RESTCatalogServer.java | 725 +++++++++++++--------
.../org/apache/paimon/rest/RESTCatalogTest.java | 120 +++-
.../org/apache/paimon/flink/RESTCatalogITCase.java | 10 +
paimon-spark/paimon-spark-ut/pom.xml | 7 +
.../paimon/spark/SparkCatalogWithRestTest.java | 84 +++
8 files changed, 781 insertions(+), 713 deletions(-)
diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index cfe105f6ac..8be5304d54 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -116,6 +116,50 @@ spark-sql ... \
```
+```sql
+USE paimon.default;
+```
+#### Creating REST Catalog
+
+By using the Paimon REST catalog, changes to the catalog will be directly
stored in remote server.
+
+##### bear token
+```bash
+spark-sql ... \
+ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon.metastore=rest \
+ --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+ --conf spark.sql.catalog.paimon.token.provider=bear \
+ --conf spark.sql.catalog.paimon.token=<token>
+
+```
+
+##### dlf ak
+```bash
+spark-sql ... \
+ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon.metastore=rest \
+ --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+ --conf spark.sql.catalog.paimon.token.provider=dlf \
+ --conf spark.sql.catalog.paimon.dlf.accessKeyId=<accessKeyId> \
+ --conf spark.sql.catalog.paimon.dlf.accessKeySecret=<accessKeySecret>
+
+```
+
+##### dlf sts token
+```bash
+spark-sql ... \
+ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+ --conf spark.sql.catalog.paimon.metastore=rest \
+ --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+ --conf spark.sql.catalog.paimon.token.provider=dlf \
+ --conf spark.sql.catalog.paimon.dlf.accessKeyId=<accessKeyId> \
+ --conf spark.sql.catalog.paimon.dlf.accessKeySecret=<accessKeySecret> \
+ --conf spark.sql.catalog.paimon.dlf.securityToken=<securityToken>
+
+
+```
+
```sql
USE paimon.default;
```
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
new file mode 100644
index 0000000000..081ada0fba
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/** Refresh data token in test mode. */
+public class DataTokenProvider {
+
+ private Map<String, String> token;
+ private long expiresAtMillis;
+
+ public DataTokenProvider(Map<String, String> token, long expiresAtMillis) {
+ this.token = token;
+ this.expiresAtMillis = expiresAtMillis;
+ }
+
+ public void setExpiresAtMillis(long expiresAtMillis) {
+ this.expiresAtMillis = expiresAtMillis;
+ }
+
+ public Map<String, String> getToken() {
+ return token;
+ }
+
+ public long getExpiresAtMillis() {
+ return expiresAtMillis;
+ }
+
+ public void setToken(Map<String, String> token) {
+ this.token = token;
+ }
+
+ public void refresh() {
+ this.token =
+ ImmutableMap.of(
+ "ak",
+ "ak-" + System.currentTimeMillis(),
+ "sk",
+ "sk-" + System.currentTimeMillis());
+ this.expiresAtMillis = System.currentTimeMillis();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
deleted file mode 100644
index bd52d7bf10..0000000000
---
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.TableType;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Database;
-import org.apache.paimon.catalog.FileSystemCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.catalog.PropertyChange;
-import org.apache.paimon.catalog.SupportsSnapshots;
-import org.apache.paimon.catalog.TableMetadata;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.Partition;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.view.View;
-
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.CoreOptions.PATH;
-
-/** A catalog for testing RESTCatalog. */
-public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog
- implements SupportsSnapshots {
-
- public final Map<String, Database> databaseStore;
- public final Map<String, TableMetadata> tableMetadataStore;
- public final Map<String, List<Partition>> tablePartitionsStore;
- public final Map<String, View> viewStore;
- public final Map<String, Snapshot> tableSnapshotStore;
- public final Map<String, RESTToken> dataTokenStore;
- public FileSystemCatalog fileSystemCatalog;
-
- public MetadataInMemoryFileSystemCatalog(
- FileIO fileIO,
- Path warehouse,
- Options options,
- Map<String, Database> databaseStore,
- Map<String, TableMetadata> tableMetadataStore,
- Map<String, Snapshot> tableSnapshotStore,
- Map<String, List<Partition>> tablePartitionsStore,
- Map<String, View> viewStore,
- Map<String, RESTToken> dataTokenStore) {
- super(fileIO, warehouse, options);
- this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse,
options);
- this.databaseStore = databaseStore;
- this.tableMetadataStore = tableMetadataStore;
- this.tablePartitionsStore = tablePartitionsStore;
- this.tableSnapshotStore = tableSnapshotStore;
- this.viewStore = viewStore;
- this.dataTokenStore = dataTokenStore;
- }
-
- public static MetadataInMemoryFileSystemCatalog create(
- CatalogContext context,
- Map<String, Database> databaseStore,
- Map<String, TableMetadata> tableMetadataStore,
- Map<String, Snapshot> tableSnapshotStore,
- Map<String, List<Partition>> tablePartitionsStore,
- Map<String, View> viewStore,
- Map<String, RESTToken> dataTokenStore) {
- String warehouse =
CatalogFactory.warehouse(context).toUri().toString();
-
- Path warehousePath = new Path(warehouse);
- FileIO fileIO;
-
- try {
- fileIO = FileIO.get(warehousePath, context);
- fileIO.checkOrMkdirs(warehousePath);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
-
- return new MetadataInMemoryFileSystemCatalog(
- fileIO,
- warehousePath,
- context.options(),
- databaseStore,
- tableMetadataStore,
- tableSnapshotStore,
- tablePartitionsStore,
- viewStore,
- dataTokenStore);
- }
-
- @Override
- public List<String> listDatabases() {
- return new ArrayList<>(databaseStore.keySet());
- }
-
- @Override
- protected void createDatabaseImpl(String name, Map<String, String>
properties) {
- super.createDatabaseImpl(name, properties);
- databaseStore.put(name, Database.of(name, properties, null));
- }
-
- @Override
- public Database getDatabaseImpl(String name) throws
DatabaseNotExistException {
- if (databaseStore.containsKey(name)) {
- return databaseStore.get(name);
- }
- throw new DatabaseNotExistException(name);
- }
-
- @Override
- protected void dropDatabaseImpl(String name) {
- super.dropDatabaseImpl(name);
- databaseStore.remove(name);
- }
-
- protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
- throws DatabaseNotExistException {
- if (databaseStore.containsKey(name)) {
- Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
- PropertyChange.getSetPropertiesToRemoveKeys(changes);
- Map<String, String> setProperties =
setPropertiesToRemoveKeys.getLeft();
- Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
- Database database = databaseStore.get(name);
- Map<String, String> parameter = new HashMap<>(database.options());
- if (!setProperties.isEmpty()) {
- parameter.putAll(setProperties);
- }
- if (!removeKeys.isEmpty()) {
- parameter.keySet().removeAll(removeKeys);
- }
- Database alterDatabase = Database.of(name, parameter, null);
- databaseStore.put(name, alterDatabase);
- } else {
- throw new DatabaseNotExistException(name);
- }
- }
-
- @Override
- protected List<String> listTablesImpl(String databaseName) {
- List<String> tables = new ArrayList<>();
- for (Map.Entry<String, TableMetadata> entry :
tableMetadataStore.entrySet()) {
- Identifier identifier = Identifier.fromString(entry.getKey());
- if (databaseName.equals(identifier.getDatabaseName())) {
- tables.add(identifier.getTableName());
- }
- }
- return tables;
- }
-
- @Override
- public void createTableImpl(Identifier identifier, Schema schema) {
- super.createTableImpl(identifier, schema);
- try {
- TableMetadata tableMetadata =
- createTableMetadata(
- identifier, 1L, schema,
UUID.randomUUID().toString(), false);
- tableMetadataStore.put(identifier.getFullName(), tableMetadata);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private TableMetadata createTableMetadata(
- Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
- Map<String, String> options = new HashMap<>(schema.options());
- Path path = getTableLocation(identifier);
- options.put(PATH.key(), path.toString());
- TableSchema tableSchema =
- new TableSchema(
- schemaId,
- schema.fields(),
- schema.fields().size() - 1,
- schema.partitionKeys(),
- schema.primaryKeys(),
- options,
- schema.comment());
- TableMetadata tableMetadata = new TableMetadata(tableSchema,
isExternal, uuid);
- return tableMetadata;
- }
-
- @Override
- protected void dropTableImpl(Identifier identifier, List<Path>
externalPaths) {
- if (tableMetadataStore.containsKey(identifier.getFullName())) {
- tableMetadataStore.remove(identifier.getFullName());
- super.dropTableImpl(identifier, externalPaths);
- }
- }
-
- @Override
- public void renameTableImpl(Identifier fromTable, Identifier toTable) {
- if (tableMetadataStore.containsKey(fromTable.getFullName())) {
- super.renameTableImpl(fromTable, toTable);
- TableMetadata tableMetadata =
tableMetadataStore.get(fromTable.getFullName());
- tableMetadataStore.remove(fromTable.getFullName());
- tableMetadataStore.put(toTable.getFullName(), tableMetadata);
- }
- }
-
- @Override
- protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
- throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- if (tableMetadataStore.containsKey(identifier.getFullName())) {
- TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
- TableSchema schema = tableMetadata.schema();
- Options options = Options.fromMap(schema.options());
- if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
- throw new UnsupportedOperationException("Only data table
support alter table.");
- }
- SchemaManager schemaManager = schemaManager(identifier);
- try {
- TableSchema newSchema =
- runWithLock(identifier, () ->
schemaManager.commitChanges(changes));
- TableMetadata newTableMetadata =
- createTableMetadata(
- identifier,
- newSchema.id(),
- newSchema.toSchema(),
- tableMetadata.uuid(),
- tableMetadata.isExternal());
- tableMetadataStore.put(identifier.getFullName(),
newTableMetadata);
- } catch (TableNotExistException
- | ColumnAlreadyExistException
- | ColumnNotExistException
- | RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- private SchemaManager schemaManager(Identifier identifier) {
- Path path = getTableLocation(identifier);
- return new SchemaManager(fileIO, path,
identifier.getBranchNameOrDefault());
- }
-
- @Override
- public void createFormatTable(Identifier identifier, Schema schema) {
- TableMetadata tableMetadata =
- createTableMetadata(identifier, 1L, schema,
UUID.randomUUID().toString(), true);
- tableMetadataStore.put(identifier.getFullName(), tableMetadata);
- }
-
- @Override
- public TableSchema loadTableSchema(Identifier identifier) throws
TableNotExistException {
- if (tableMetadataStore.containsKey(identifier.getFullName())) {
- return tableMetadataStore.get(identifier.getFullName()).schema();
- }
- throw new TableNotExistException(identifier);
- }
-
- @Override
- protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
- if (tableMetadataStore.containsKey(identifier.getFullName())) {
- return tableMetadataStore.get(identifier.getFullName());
- }
- throw new TableNotExistException(identifier);
- }
-
- @Override
- public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- getTable(identifier);
- tablePartitionsStore.put(
- identifier.getFullName(),
- partitions.stream()
- .map(partition -> spec2Partition(partition))
- .collect(Collectors.toList()));
- }
-
- @Override
- public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- getTable(identifier);
- List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
- partitions.forEach(
- partition -> {
- for (Map.Entry<String, String> entry :
partition.entrySet()) {
- existPartitions.stream()
- .filter(
- p ->
-
p.spec().containsKey(entry.getKey())
- && p.spec()
-
.get(entry.getKey())
-
.equals(entry.getValue()))
- .findFirst()
- .ifPresent(
- existPartition ->
existPartitions.remove(existPartition));
- }
- });
- }
-
- @Override
- public void alterPartitions(Identifier identifier, List<Partition>
partitions)
- throws TableNotExistException {
- getTable(identifier);
- List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
- partitions.forEach(
- partition -> {
- for (Map.Entry<String, String> entry :
partition.spec().entrySet()) {
- existPartitions.stream()
- .filter(
- p ->
-
p.spec().containsKey(entry.getKey())
- && p.spec()
-
.get(entry.getKey())
-
.equals(entry.getValue()))
- .findFirst()
- .ifPresent(
- existPartition ->
existPartitions.remove(existPartition));
- }
- });
- existPartitions.addAll(partitions);
- tablePartitionsStore.put(identifier.getFullName(), existPartitions);
- }
-
- @Override
- public List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException {
- getTable(identifier);
- return tablePartitionsStore.get(identifier.getFullName());
- }
-
- @Override
- public View getView(Identifier identifier) throws ViewNotExistException {
- if (viewStore.containsKey(identifier.getFullName())) {
- return viewStore.get(identifier.getFullName());
- }
- throw new ViewNotExistException(identifier);
- }
-
- @Override
- public void dropView(Identifier identifier, boolean ignoreIfNotExists)
- throws ViewNotExistException {
- if (viewStore.containsKey(identifier.getFullName())) {
- viewStore.remove(identifier.getFullName());
- }
- if (!ignoreIfNotExists) {
- throw new ViewNotExistException(identifier);
- }
- }
-
- @Override
- public void createView(Identifier identifier, View view, boolean
ignoreIfExists)
- throws ViewAlreadyExistException, DatabaseNotExistException {
- getDatabase(identifier.getDatabaseName());
- if (viewStore.containsKey(identifier.getFullName()) &&
!ignoreIfExists) {
- throw new ViewAlreadyExistException(identifier);
- }
- viewStore.put(identifier.getFullName(), view);
- }
-
- @Override
- public List<String> listViews(String databaseName) throws
DatabaseNotExistException {
- getDatabase(databaseName);
- return viewStore.keySet().stream()
- .map(Identifier::fromString)
- .filter(identifier ->
identifier.getDatabaseName().equals(databaseName))
- .map(Identifier::getTableName)
- .collect(Collectors.toList());
- }
-
- @Override
- public void renameView(Identifier fromView, Identifier toView, boolean
ignoreIfNotExists)
- throws ViewNotExistException, ViewAlreadyExistException {
- if (!viewStore.containsKey(fromView.getFullName()) &&
!ignoreIfNotExists) {
- throw new ViewNotExistException(fromView);
- }
- if (viewStore.containsKey(toView.getFullName())) {
- throw new ViewAlreadyExistException(toView);
- }
- if (viewStore.containsKey(fromView.getFullName())) {
- View view = viewStore.get(fromView.getFullName());
- viewStore.remove(fromView.getFullName());
- viewStore.put(toView.getFullName(), view);
- }
- }
-
- @Override
- public boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot, List<Partition>
statistics)
- throws TableNotExistException {
- tableSnapshotStore.put(identifier.getFullName(), snapshot);
- return false;
- }
-
- @Override
- public Optional<Snapshot> loadSnapshot(Identifier identifier) throws
TableNotExistException {
- return
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
- }
-
- public RESTToken getToken(Identifier identifier) {
- if (dataTokenStore.containsKey(identifier.getFullName())) {
- return dataTokenStore.get(identifier.getFullName());
- }
- long currentTimeMillis = System.currentTimeMillis();
- RESTToken token =
- new RESTToken(
- ImmutableMap.of(
- "akId",
- "akId" + currentTimeMillis,
- "akSecret",
- "akSecret" + currentTimeMillis),
- currentTimeMillis);
- dataTokenStore.put(identifier.getFullName(), token);
- return dataTokenStore.get(identifier.getFullName());
- }
-
- private Partition spec2Partition(Map<String, String> spec) {
- // todo: need update
- return new Partition(spec, 123, 456, 789, 123);
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 15de6189c1..30d7b0a598 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -23,10 +23,13 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
@@ -61,11 +64,11 @@ import
org.apache.paimon.rest.responses.ListPartitionsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
import org.apache.paimon.rest.responses.ListViewsResponse;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FormatTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
import org.apache.paimon.view.ViewSchema;
@@ -76,17 +79,23 @@ import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
/** Mock REST server for testing. */
@@ -95,7 +104,7 @@ public class RESTCatalogServer {
private static final String PREFIX = "paimon";
private static final String DATABASE_URI =
String.format("/v1/%s/databases", PREFIX);
- private final MetadataInMemoryFileSystemCatalog catalog;
+ private final FileSystemCatalog catalog;
private final Dispatcher dispatcher;
private final MockWebServer server;
private final String authToken;
@@ -111,16 +120,17 @@ public class RESTCatalogServer {
authToken = initToken;
Options conf = new Options();
conf.setString("warehouse", warehouse);
- this.catalog =
- MetadataInMemoryFileSystemCatalog.create(
- CatalogContext.create(conf),
- databaseStore,
- tableMetadataStore,
- tableSnapshotStore,
- tablePartitionsStore,
- viewStore,
- dataTokenStore);
- this.dispatcher = initDispatcher(catalog, warehouse, authToken);
+ CatalogContext context = CatalogContext.create(conf);
+ Path warehousePath = new Path(warehouse);
+ FileIO fileIO;
+ try {
+ fileIO = FileIO.get(warehousePath, context);
+ fileIO.checkOrMkdirs(warehousePath);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ this.catalog = new FileSystemCatalog(fileIO, warehousePath,
context.options());
+ this.dispatcher = initDispatcher(warehouse, authToken);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
server = mockWebServer;
@@ -146,8 +156,7 @@ public class RESTCatalogServer {
dataTokenStore.put(identifier.getFullName(), token);
}
- public static Dispatcher initDispatcher(
- MetadataInMemoryFileSystemCatalog catalog, String warehouse,
String authToken) {
+ public Dispatcher initDispatcher(String warehouse, String authToken) {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
@@ -159,17 +168,27 @@ public class RESTCatalogServer {
return new MockResponse().setResponseCode(401);
}
if (request.getPath().startsWith("/v1/config")) {
- return new MockResponse()
- .setResponseCode(200)
- .setBody(getConfigBody(warehouse));
+ String body =
+ String.format(
+ "{\"defaults\": {\"%s\": \"%s\",
\"%s\": \"%s\", \"%s\": \"%s\"}}",
+
RESTCatalogInternalOptions.PREFIX.key(),
+ PREFIX,
+ CatalogOptions.WAREHOUSE.key(),
+ warehouse,
+ "header.test-header",
+ "test-value");
+ return new
MockResponse().setResponseCode(200).setBody(body);
} else if (DATABASE_URI.equals(request.getPath())) {
- return databasesApiHandler(catalog, request);
+ return databasesApiHandler(request);
} else if (request.getPath().startsWith(DATABASE_URI)) {
String[] resources =
request.getPath()
.substring((DATABASE_URI +
"/").length())
.split("/");
String databaseName = resources[0];
+ if (!databaseStore.containsKey(databaseName)) {
+ throw new
Catalog.DatabaseNotExistException(databaseName);
+ }
boolean isViews = resources.length == 2 &&
"views".equals(resources[1]);
boolean isTables = resources.length == 2 &&
"tables".equals(resources[1]);
boolean isTableRename =
@@ -226,44 +245,28 @@ public class RESTCatalogServer {
resources.length >= 4
&& "tables".equals(resources[1])
&& "branches".equals(resources[3]);
- if (isDropPartitions) {
+ Identifier identifier =
+ resources.length >= 3
+ ? Identifier.create(databaseName,
resources[2])
+ : null;
+ // validate partition
+ if (isPartitions
+ || isDropPartitions
+ || isAlterPartitions
+ || isMarkDonePartitions) {
String tableName = resources[2];
- Identifier identifier =
Identifier.create(databaseName, tableName);
Optional<MockResponse> error =
- checkTablePartitioned(catalog, identifier);
+ checkTablePartitioned(
+ Identifier.create(databaseName,
tableName));
if (error.isPresent()) {
return error.get();
}
- DropPartitionsRequest dropPartitionsRequest =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
- DropPartitionsRequest.class);
- catalog.dropPartitions(
- identifier,
dropPartitionsRequest.getPartitionSpecs());
- return new MockResponse().setResponseCode(200);
+ }
+ if (isDropPartitions) {
+ return dropPartitionsHandle(identifier, request);
} else if (isAlterPartitions) {
- String tableName = resources[2];
- Identifier identifier =
Identifier.create(databaseName, tableName);
- Optional<MockResponse> error =
- checkTablePartitioned(catalog, identifier);
- if (error.isPresent()) {
- return error.get();
- }
- AlterPartitionsRequest alterPartitionsRequest =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
- AlterPartitionsRequest.class);
- catalog.alterPartitions(
- identifier,
alterPartitionsRequest.getPartitions());
- return new MockResponse().setResponseCode(200);
+ return alterPartitionsHandle(identifier, request);
} else if (isMarkDonePartitions) {
- String tableName = resources[2];
- Identifier identifier =
Identifier.create(databaseName, tableName);
- Optional<MockResponse> error =
- checkTablePartitioned(catalog, identifier);
- if (error.isPresent()) {
- return error.get();
- }
MarkDonePartitionsRequest
markDonePartitionsRequest =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
@@ -272,17 +275,8 @@ public class RESTCatalogServer {
identifier,
markDonePartitionsRequest.getPartitionSpecs());
return new MockResponse().setResponseCode(200);
} else if (isPartitions) {
- String tableName = resources[2];
- Optional<MockResponse> error =
- checkTablePartitioned(
- catalog,
Identifier.create(databaseName, tableName));
- if (error.isPresent()) {
- return error.get();
- }
- return partitionsApiHandler(catalog, request,
databaseName, tableName);
+ return partitionsApiHandler(request, identifier);
} else if (isBranches) {
- String tableName = resources[2];
- Identifier identifier =
Identifier.create(databaseName, tableName);
FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier);
BranchManager branchManager =
table.branchManager();
switch (request.getMethod()) {
@@ -318,56 +312,25 @@ public class RESTCatalogServer {
return new
MockResponse().setResponseCode(404);
}
} else if (isTableToken) {
- RESTToken dataToken =
-
catalog.getToken(Identifier.create(databaseName, resources[2]));
- GetTableTokenResponse getTableTokenResponse =
- new GetTableTokenResponse(
- dataToken.token(),
dataToken.expireAtMillis());
- return new MockResponse()
- .setResponseCode(200)
- .setBody(
- OBJECT_MAPPER.writeValueAsString(
- getTableTokenResponse));
+ return handleDataToken(identifier);
} else if (isTableSnapshot) {
- String tableName = resources[2];
- Optional<Snapshot> snapshotOptional =
- catalog.loadSnapshot(
- Identifier.create(databaseName,
tableName));
- if (!snapshotOptional.isPresent()) {
- response =
- new ErrorResponse(
-
ErrorResponseResourceType.SNAPSHOT,
- databaseName,
- "No Snapshot",
- 404);
- return mockResponse(response, 404);
- }
- GetTableSnapshotResponse getTableSnapshotResponse =
- new
GetTableSnapshotResponse(snapshotOptional.get());
- return new MockResponse()
- .setResponseCode(200)
- .setBody(
- OBJECT_MAPPER.writeValueAsString(
- getTableSnapshotResponse));
+ return handleSnapshot(identifier);
} else if (isTableRename) {
- return renameTableApiHandler(catalog, request);
+ return renameTableApiHandler(request);
} else if (isTableCommit) {
- return commitTableApiHandler(
- catalog, request, databaseName,
resources[2]);
+ return commitTableApiHandler(request);
} else if (isTable) {
- String tableName = resources[2];
- return tableApiHandler(catalog, request,
databaseName, tableName);
+ return tableApiHandler(request, identifier);
} else if (isTables) {
- return tablesApiHandler(catalog, request,
databaseName);
+ return tablesApiHandler(request, databaseName);
} else if (isViews) {
- return viewsApiHandler(catalog, request,
databaseName);
+ return viewsApiHandler(request, databaseName);
} else if (isViewRename) {
- return renameViewApiHandler(catalog, request);
+ return renameViewApiHandler(request);
} else if (isView) {
- String viewName = resources[2];
- return viewApiHandler(catalog, request,
databaseName, viewName);
+ return viewApiHandler(request, identifier);
} else {
- return databaseApiHandler(catalog, request,
databaseName);
+ return databaseApiHandler(request, databaseName);
}
}
return new MockResponse().setResponseCode(404);
@@ -458,31 +421,69 @@ public class RESTCatalogServer {
};
}
- private static Optional<MockResponse> checkTablePartitioned(
- Catalog catalog, Identifier identifier) {
- Table table;
- try {
- table = catalog.getTable(identifier);
- } catch (Catalog.TableNotExistException e) {
- return Optional.of(
- mockResponse(
- new ErrorResponse(ErrorResponseResourceType.TABLE,
null, "", 404),
- 404));
+ private MockResponse handleDataToken(Identifier tableIdentifier) throws
Exception {
+ RESTToken dataToken;
+ if (dataTokenStore.containsKey(tableIdentifier.getFullName())) {
+ dataToken = dataTokenStore.get(tableIdentifier.getFullName());
+ } else {
+ long currentTimeMillis = System.currentTimeMillis();
+ dataToken =
+ new RESTToken(
+ ImmutableMap.of(
+ "akId",
+ "akId" + currentTimeMillis,
+ "akSecret",
+ "akSecret" + currentTimeMillis),
+ currentTimeMillis);
+ dataTokenStore.put(tableIdentifier.getFullName(), dataToken);
}
- boolean partitioned =
CoreOptions.fromMap(table.options()).partitionedTableInMetastore();
- if (!partitioned) {
- return Optional.of(mockResponse(new ErrorResponse(null, null, "",
501), 501));
+ GetTableTokenResponse getTableTokenResponse =
+ new GetTableTokenResponse(dataToken.token(),
dataToken.expireAtMillis());
+ return new MockResponse()
+ .setResponseCode(200)
+
.setBody(OBJECT_MAPPER.writeValueAsString(getTableTokenResponse));
+ }
+
+ private MockResponse handleSnapshot(Identifier identifier) throws
Exception {
+ RESTResponse response;
+ Optional<Snapshot> snapshotOptional =
+
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
+ if (!snapshotOptional.isPresent()) {
+ response =
+ new ErrorResponse(
+ ErrorResponseResourceType.SNAPSHOT,
+ identifier.getDatabaseName(),
+ "No Snapshot",
+ 404);
+ return mockResponse(response, 404);
}
- return Optional.empty();
+ GetTableSnapshotResponse getTableSnapshotResponse =
+ new GetTableSnapshotResponse(snapshotOptional.get());
+ return new MockResponse()
+ .setResponseCode(200)
+
.setBody(OBJECT_MAPPER.writeValueAsString(getTableSnapshotResponse));
}
- private static MockResponse commitTableApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName,
String tableName)
- throws Exception {
+ private Optional<MockResponse> checkTablePartitioned(Identifier
identifier) {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ boolean partitioned =
+ CoreOptions.fromMap(tableMetadata.schema().options())
+ .partitionedTableInMetastore();
+ if (!partitioned) {
+ return Optional.of(mockResponse(new ErrorResponse(null, null,
"", 501), 501));
+ }
+ return Optional.empty();
+ }
+ return Optional.of(
+ mockResponse(
+ new ErrorResponse(ErrorResponseResourceType.TABLE,
null, "", 404), 404));
+ }
+
+ private MockResponse commitTableApiHandler(RecordedRequest request) throws
Exception {
CommitTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
CommitTableRequest.class);
- FileStoreTable table =
- (FileStoreTable)
catalog.getTable(Identifier.create(databaseName, tableName));
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(requestBody.getIdentifier());
RenamingSnapshotCommit commit =
new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
String branchName = requestBody.getIdentifier().getBranchName();
@@ -491,16 +492,16 @@ public class RESTCatalogServer {
}
boolean success =
commit.commit(requestBody.getSnapshot(), branchName,
Collections.emptyList());
+ commitSnapshot(requestBody.getIdentifier(), requestBody.getSnapshot(),
null);
CommitTableResponse response = new CommitTableResponse(success);
return mockResponse(response, 200);
}
- private static MockResponse databasesApiHandler(Catalog catalog,
RecordedRequest request)
- throws Exception {
+ private MockResponse databasesApiHandler(RecordedRequest request) throws
Exception {
RESTResponse response;
switch (request.getMethod()) {
case "GET":
- List<String> databaseNameList = catalog.listDatabases();
+ List<String> databaseNameList = new
ArrayList<>(databaseStore.keySet());
response = new ListDatabasesResponse(databaseNameList);
return mockResponse(response, 200);
case "POST":
@@ -509,6 +510,8 @@ public class RESTCatalogServer {
request.getBody().readUtf8(),
CreateDatabaseRequest.class);
String databaseName = requestBody.getName();
catalog.createDatabase(databaseName, false);
+ databaseStore.put(
+ databaseName, Database.of(databaseName,
requestBody.getOptions(), null));
response = new CreateDatabaseResponse(databaseName,
requestBody.getOptions());
return mockResponse(response, 200);
default:
@@ -516,124 +519,215 @@ public class RESTCatalogServer {
}
}
- private static MockResponse databaseApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName)
throws Exception {
+ private MockResponse databaseApiHandler(RecordedRequest request, String
databaseName)
+ throws Exception {
RESTResponse response;
- switch (request.getMethod()) {
- case "GET":
- Database database = catalog.getDatabase(databaseName);
- response =
- new GetDatabaseResponse(
- UUID.randomUUID().toString(), database.name(),
database.options());
- return mockResponse(response, 200);
- case "DELETE":
- catalog.dropDatabase(databaseName, false, true);
- return new MockResponse().setResponseCode(200);
- case "POST":
- AlterDatabaseRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
AlterDatabaseRequest.class);
- List<PropertyChange> changes = new ArrayList<>();
- for (String property : requestBody.getRemovals()) {
- changes.add(PropertyChange.removeProperty(property));
- }
- for (Map.Entry<String, String> entry :
requestBody.getUpdates().entrySet()) {
- changes.add(PropertyChange.setProperty(entry.getKey(),
entry.getValue()));
- }
- catalog.alterDatabase(databaseName, changes, false);
- AlterDatabaseResponse alterDatabaseResponse =
- new AlterDatabaseResponse(
- requestBody.getRemovals(),
- requestBody.getUpdates().keySet().stream()
- .collect(Collectors.toList()),
- Collections.emptyList());
- return mockResponse(alterDatabaseResponse, 200);
- default:
- return new MockResponse().setResponseCode(404);
+ Database database;
+ if (databaseStore.containsKey(databaseName)) {
+ switch (request.getMethod()) {
+ case "GET":
+ database = databaseStore.get(databaseName);
+ response =
+ new GetDatabaseResponse(
+ UUID.randomUUID().toString(),
+ database.name(),
+ database.options());
+ return mockResponse(response, 200);
+ case "DELETE":
+ catalog.dropDatabase(databaseName, false, true);
+ databaseStore.remove(databaseName);
+ return new MockResponse().setResponseCode(200);
+ case "POST":
+ AlterDatabaseRequest requestBody =
+ OBJECT_MAPPER.readValue(
+ request.getBody().readUtf8(),
AlterDatabaseRequest.class);
+ List<PropertyChange> changes = new ArrayList<>();
+ for (String property : requestBody.getRemovals()) {
+ changes.add(PropertyChange.removeProperty(property));
+ }
+ for (Map.Entry<String, String> entry :
requestBody.getUpdates().entrySet()) {
+ changes.add(PropertyChange.setProperty(entry.getKey(),
entry.getValue()));
+ }
+ if (databaseStore.containsKey(databaseName)) {
+ Pair<Map<String, String>, Set<String>>
setPropertiesToRemoveKeys =
+
PropertyChange.getSetPropertiesToRemoveKeys(changes);
+ Map<String, String> setProperties =
setPropertiesToRemoveKeys.getLeft();
+ Set<String> removeKeys =
setPropertiesToRemoveKeys.getRight();
+ database = databaseStore.get(databaseName);
+ Map<String, String> parameter = new
HashMap<>(database.options());
+ if (!setProperties.isEmpty()) {
+ parameter.putAll(setProperties);
+ }
+ if (!removeKeys.isEmpty()) {
+ parameter.keySet().removeAll(removeKeys);
+ }
+ Database alterDatabase = Database.of(databaseName,
parameter, null);
+ databaseStore.put(databaseName, alterDatabase);
+ } else {
+ throw new
Catalog.DatabaseNotExistException(databaseName);
+ }
+ AlterDatabaseResponse alterDatabaseResponse =
+ new AlterDatabaseResponse(
+ requestBody.getRemovals(),
+ requestBody.getUpdates().keySet().stream()
+ .collect(Collectors.toList()),
+ Collections.emptyList());
+ return mockResponse(alterDatabaseResponse, 200);
+ default:
+ return new MockResponse().setResponseCode(404);
+ }
}
+ return new MockResponse().setResponseCode(404);
}
- private static MockResponse tablesApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName)
throws Exception {
+ private MockResponse tablesApiHandler(RecordedRequest request, String
databaseName)
+ throws Exception {
RESTResponse response;
- switch (request.getMethod()) {
- case "GET":
- response = new
ListTablesResponse(catalog.listTables(databaseName));
- return mockResponse(response, 200);
- case "POST":
- CreateTableRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
CreateTableRequest.class);
- catalog.createTable(requestBody.getIdentifier(),
requestBody.getSchema(), false);
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
+ if (databaseStore.containsKey(databaseName)) {
+ switch (request.getMethod()) {
+ case "GET":
+ List<String> tables = new ArrayList<>();
+ for (Map.Entry<String, TableMetadata> entry :
tableMetadataStore.entrySet()) {
+ Identifier identifier =
Identifier.fromString(entry.getKey());
+ if (databaseName.equals(identifier.getDatabaseName()))
{
+ tables.add(identifier.getTableName());
+ }
+ }
+ response = new ListTablesResponse(tables);
+ return mockResponse(response, 200);
+ case "POST":
+ CreateTableRequest requestBody =
+ OBJECT_MAPPER.readValue(
+ request.getBody().readUtf8(),
CreateTableRequest.class);
+ Identifier identifier = requestBody.getIdentifier();
+ Schema schema = requestBody.getSchema();
+ TableMetadata tableMetadata;
+ if (isFormatTable(schema)) {
+ tableMetadata = createFormatTable(identifier, schema);
+ } else {
+ catalog.createTable(identifier, schema, false);
+ tableMetadata =
+ createTableMetadata(
+ requestBody.getIdentifier(),
+ 1L,
+ requestBody.getSchema(),
+ UUID.randomUUID().toString(),
+ false);
+ }
+ tableMetadataStore.put(
+ requestBody.getIdentifier().getFullName(),
tableMetadata);
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
+ }
}
+ return mockResponse(
+ new ErrorResponse(ErrorResponseResourceType.DATABASE, null,
"", 404), 404);
+ }
+
+ private boolean isFormatTable(Schema schema) {
+ return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE;
}
- private static MockResponse tableApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName,
String tableName)
+ private MockResponse tableApiHandler(RecordedRequest request, Identifier
identifier)
throws Exception {
RESTResponse response;
- Identifier identifier = Identifier.create(databaseName, tableName);
- switch (request.getMethod()) {
- case "GET":
- response = getTable(catalog, databaseName, tableName);
- return mockResponse(response, 200);
- case "POST":
- AlterTableRequest requestBody =
- OBJECT_MAPPER.readValue(
- request.getBody().readUtf8(),
AlterTableRequest.class);
- catalog.alterTable(identifier, requestBody.getChanges(),
false);
- return new MockResponse().setResponseCode(200);
- case "DELETE":
- catalog.dropTable(identifier, false);
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ switch (request.getMethod()) {
+ case "GET":
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ response =
+ new GetTableResponse(
+ tableMetadata.uuid(),
+ identifier.getTableName(),
+ tableMetadata.isExternal(),
+ tableMetadata.schema().id(),
+ tableMetadata.schema().toSchema());
+ return mockResponse(response, 200);
+ case "POST":
+ AlterTableRequest requestBody =
+ OBJECT_MAPPER.readValue(
+ request.getBody().readUtf8(),
AlterTableRequest.class);
+ alterTableImpl(identifier, requestBody.getChanges());
+ return new MockResponse().setResponseCode(200);
+ case "DELETE":
+ try {
+ catalog.dropTable(identifier, false);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ tableMetadataStore.remove(identifier.getFullName());
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
+ }
+ } else {
+ throw new Catalog.TableNotExistException(identifier);
}
}
- private static MockResponse renameTableApiHandler(Catalog catalog,
RecordedRequest request)
- throws Exception {
+ private MockResponse renameTableApiHandler(RecordedRequest request) throws
Exception {
RenameTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
- catalog.renameTable(requestBody.getSource(),
requestBody.getDestination(), false);
+ Identifier fromTable = requestBody.getSource();
+ Identifier toTable = requestBody.getDestination();
+ if (tableMetadataStore.containsKey(fromTable.getFullName())) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(fromTable.getFullName());
+ if (!isFormatTable(tableMetadata.schema().toSchema())) {
+ catalog.renameTable(requestBody.getSource(),
requestBody.getDestination(), false);
+ }
+ tableMetadataStore.remove(fromTable.getFullName());
+ tableMetadataStore.put(toTable.getFullName(), tableMetadata);
+ } else {
+ throw new Catalog.TableNotExistException(fromTable);
+ }
return new MockResponse().setResponseCode(200);
}
- private static MockResponse partitionsApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName,
String tableName)
+ private MockResponse partitionsApiHandler(RecordedRequest request,
Identifier tableIdentifier)
throws Exception {
RESTResponse response;
- Identifier identifier = Identifier.create(databaseName, tableName);
switch (request.getMethod()) {
case "GET":
- List<Partition> partitions =
catalog.listPartitions(identifier);
+ List<Partition> partitions =
+
tablePartitionsStore.get(tableIdentifier.getFullName());
response = new ListPartitionsResponse(partitions);
return mockResponse(response, 200);
case "POST":
CreatePartitionsRequest requestBody =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
CreatePartitionsRequest.class);
- catalog.createPartitions(identifier,
requestBody.getPartitionSpecs());
+ tablePartitionsStore.put(
+ tableIdentifier.getFullName(),
+ requestBody.getPartitionSpecs().stream()
+ .map(partition -> spec2Partition(partition))
+ .collect(Collectors.toList()));
return new MockResponse().setResponseCode(200);
default:
return new MockResponse().setResponseCode(404);
}
}
- private static MockResponse viewsApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName)
throws Exception {
+ private MockResponse viewsApiHandler(RecordedRequest request, String
databaseName)
+ throws Exception {
RESTResponse response;
switch (request.getMethod()) {
case "GET":
- response = new
ListViewsResponse(catalog.listViews(databaseName));
+ List<String> views =
+ viewStore.keySet().stream()
+ .map(Identifier::fromString)
+ .filter(
+ identifier ->
+
identifier.getDatabaseName().equals(databaseName))
+ .map(Identifier::getTableName)
+ .collect(Collectors.toList());
+ response = new ListViewsResponse(views);
return mockResponse(response, 200);
case "POST":
CreateViewRequest requestBody =
OBJECT_MAPPER.readValue(
request.getBody().readUtf8(),
CreateViewRequest.class);
+ Identifier identifier = requestBody.getIdentifier();
ViewSchema schema = requestBody.getSchema();
ViewImpl view =
new ViewImpl(
@@ -643,71 +737,178 @@ public class RESTCatalogServer {
schema.dialects(),
schema.comment(),
schema.options());
- catalog.createView(requestBody.getIdentifier(), view, false);
+ if (viewStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.ViewAlreadyExistException(identifier);
+ }
+ viewStore.put(identifier.getFullName(), view);
return new MockResponse().setResponseCode(200);
default:
return new MockResponse().setResponseCode(404);
}
}
- private static MockResponse viewApiHandler(
- Catalog catalog, RecordedRequest request, String databaseName,
String viewName)
+ private MockResponse viewApiHandler(RecordedRequest request, Identifier
identifier)
throws Exception {
RESTResponse response;
- Identifier identifier = Identifier.create(databaseName, viewName);
- switch (request.getMethod()) {
- case "GET":
- View view = catalog.getView(identifier);
- ViewSchema schema =
- new ViewSchema(
- view.rowType().getFields(),
- view.query(),
- view.dialects(),
- view.comment().orElse(null),
- view.options());
- response = new GetViewResponse("id",
identifier.getTableName(), schema);
- return mockResponse(response, 200);
- case "DELETE":
- catalog.dropView(identifier, false);
- return new MockResponse().setResponseCode(200);
- default:
- return new MockResponse().setResponseCode(404);
+ if (viewStore.containsKey(identifier.getFullName())) {
+ switch (request.getMethod()) {
+ case "GET":
+ if (viewStore.containsKey(identifier.getFullName())) {
+ View view = viewStore.get(identifier.getFullName());
+ ViewSchema schema =
+ new ViewSchema(
+ view.rowType().getFields(),
+ view.query(),
+ view.dialects(),
+ view.comment().orElse(null),
+ view.options());
+ response = new GetViewResponse("id",
identifier.getTableName(), schema);
+ return mockResponse(response, 200);
+ }
+ throw new Catalog.ViewNotExistException(identifier);
+ case "DELETE":
+ viewStore.remove(identifier.getFullName());
+ return new MockResponse().setResponseCode(200);
+ default:
+ return new MockResponse().setResponseCode(404);
+ }
}
+ throw new Catalog.ViewNotExistException(identifier);
}
- private static MockResponse renameViewApiHandler(Catalog catalog,
RecordedRequest request)
- throws Exception {
+ private MockResponse renameViewApiHandler(RecordedRequest request) throws
Exception {
RenameTableRequest requestBody =
OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
RenameTableRequest.class);
- catalog.renameView(requestBody.getSource(),
requestBody.getDestination(), false);
+ Identifier fromView = requestBody.getSource();
+ Identifier toView = requestBody.getDestination();
+ if (!viewStore.containsKey(fromView.getFullName())) {
+ throw new Catalog.ViewNotExistException(fromView);
+ }
+ if (viewStore.containsKey(toView.getFullName())) {
+ throw new Catalog.ViewAlreadyExistException(toView);
+ }
+ if (viewStore.containsKey(fromView.getFullName())) {
+ View view = viewStore.get(fromView.getFullName());
+ viewStore.remove(fromView.getFullName());
+ viewStore.put(toView.getFullName(), view);
+ }
return new MockResponse().setResponseCode(200);
}
- private static GetTableResponse getTable(Catalog catalog, String
databaseName, String tableName)
- throws Exception {
- Identifier identifier = Identifier.create(databaseName, tableName);
- Table table = catalog.getTable(identifier);
- Schema schema;
- Long schemaId = 1L;
- if (table instanceof FileStoreTable) {
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- schema = fileStoreTable.schema().toSchema();
- schemaId = fileStoreTable.schema().id();
+ protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
+ throws Catalog.TableNotExistException,
Catalog.ColumnAlreadyExistException,
+ Catalog.ColumnNotExistException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ TableSchema schema = tableMetadata.schema();
+ if (isFormatTable(schema.toSchema())) {
+ throw new UnsupportedOperationException("Only data table
support alter table.");
+ }
+ try {
+ catalog.alterTable(identifier, changes, false);
+ FileStoreTable table = (FileStoreTable)
catalog.getTable(identifier);
+ TableSchema newSchema = table.schema();
+ TableMetadata newTableMetadata =
+ createTableMetadata(
+ identifier,
+ newSchema.id(),
+ newSchema.toSchema(),
+ tableMetadata.uuid(),
+ tableMetadata.isExternal());
+ tableMetadataStore.put(identifier.getFullName(),
newTableMetadata);
+ } catch (Catalog.TableNotExistException
+ | Catalog.ColumnAlreadyExistException
+ | Catalog.ColumnNotExistException
+ | RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private boolean commitSnapshot(
+ Identifier identifier, Snapshot snapshot, List<Partition>
statistics)
+ throws Catalog.TableNotExistException {
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ RenamingSnapshotCommit commit =
+ new RenamingSnapshotCommit(table.snapshotManager(),
Lock.empty());
+ String branchName = identifier.getBranchName();
+ if (branchName == null) {
+ branchName = "main";
+ }
+ try {
+ boolean success = commit.commit(snapshot, branchName,
Collections.emptyList());
+ tableSnapshotStore.put(identifier.getFullName(), snapshot);
+ return success;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private MockResponse dropPartitionsHandle(Identifier identifier,
RecordedRequest request)
+ throws Catalog.TableNotExistException, JsonProcessingException {
+ DropPartitionsRequest dropPartitionsRequest =
+ OBJECT_MAPPER.readValue(request.getBody().readUtf8(),
DropPartitionsRequest.class);
+ List<Map<String, String>> partitionSpecs =
dropPartitionsRequest.getPartitionSpecs();
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
+ partitionSpecs.forEach(
+ partition -> {
+ for (Map.Entry<String, String> entry :
partition.entrySet()) {
+ existPartitions.stream()
+ .filter(
+ p ->
+
p.spec().containsKey(entry.getKey())
+ && p.spec()
+
.get(entry.getKey())
+
.equals(entry.getValue()))
+ .findFirst()
+ .ifPresent(
+ existPartition ->
+
existPartitions.remove(existPartition));
+ }
+ });
+ return new MockResponse().setResponseCode(200);
+
} else {
- FormatTable formatTable = (FormatTable) table;
- List<DataField> fields = formatTable.rowType().getFields();
- schema =
- new Schema(
- fields,
- table.partitionKeys(),
- table.primaryKeys(),
- table.options(),
- table.comment().orElse(null));
+ throw new Catalog.TableNotExistException(identifier);
}
- return new GetTableResponse(table.uuid(), table.name(), false,
schemaId, schema);
}
- private static MockResponse mockResponse(RESTResponse response, int
httpCode) {
+ private MockResponse alterPartitionsHandle(Identifier identifier,
RecordedRequest request)
+ throws Catalog.TableNotExistException, JsonProcessingException {
+ if (tableMetadataStore.containsKey(identifier.getFullName())) {
+ AlterPartitionsRequest alterPartitionsRequest =
+ OBJECT_MAPPER.readValue(
+ request.getBody().readUtf8(),
AlterPartitionsRequest.class);
+ List<Partition> partitions =
alterPartitionsRequest.getPartitions();
+ List<Partition> existPartitions =
tablePartitionsStore.get(identifier.getFullName());
+ partitions.forEach(
+ partition -> {
+ for (Map.Entry<String, String> entry :
partition.spec().entrySet()) {
+ existPartitions.stream()
+ .filter(
+ p ->
+
p.spec().containsKey(entry.getKey())
+ && p.spec()
+
.get(entry.getKey())
+
.equals(entry.getValue()))
+ .findFirst()
+ .ifPresent(
+ existPartition ->
+
existPartitions.remove(existPartition));
+ }
+ });
+ existPartitions.addAll(partitions);
+ tablePartitionsStore.put(identifier.getFullName(),
existPartitions);
+ return new MockResponse().setResponseCode(200);
+ } else {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+ }
+
+ private MockResponse mockResponse(RESTResponse response, int httpCode) {
try {
return new MockResponse()
.setResponseCode(httpCode)
@@ -718,14 +919,30 @@ public class RESTCatalogServer {
}
}
- private static String getConfigBody(String warehouseStr) {
- return String.format(
- "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\":
\"%s\"}}",
- RESTCatalogInternalOptions.PREFIX.key(),
- PREFIX,
- CatalogOptions.WAREHOUSE.key(),
- warehouseStr,
- "header.test-header",
- "test-value");
+ private TableMetadata createTableMetadata(
+ Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
+ Map<String, String> options = new HashMap<>(schema.options());
+ Path path = catalog.getTableLocation(identifier);
+ options.put(PATH.key(), path.toString());
+ TableSchema tableSchema =
+ new TableSchema(
+ schemaId,
+ schema.fields(),
+ schema.fields().size() - 1,
+ schema.partitionKeys(),
+ schema.primaryKeys(),
+ options,
+ schema.comment());
+ TableMetadata tableMetadata = new TableMetadata(tableSchema,
isExternal, uuid);
+ return tableMetadata;
+ }
+
+ private TableMetadata createFormatTable(Identifier identifier, Schema
schema) {
+ return createTableMetadata(identifier, 1L, schema,
UUID.randomUUID().toString(), true);
+ }
+
+ private Partition spec2Partition(Map<String, String> spec) {
+ // todo: need update
+ return new Partition(spec, 123, 456, 789, 123);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b9a550950c..92c6db658d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -23,15 +23,25 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.auth.RESTAuthParameter;
import org.apache.paimon.rest.exceptions.NotAuthorizedException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -43,11 +53,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
@@ -125,12 +137,7 @@ class RESTCatalogTest extends CatalogTestBase {
@Test
void testRefreshFileIO() throws Exception {
- Options options = new Options();
- options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
- options.set(RESTCatalogOptions.TOKEN, initToken);
- options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
- options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
- this.catalog = new RESTCatalog(CatalogContext.create(options));
+ this.catalog = initDataTokenCatalog();
List<Identifier> identifiers =
Lists.newArrayList(
Identifier.create("test_db_a", "test_table_a"),
@@ -140,29 +147,101 @@ class RESTCatalogTest extends CatalogTestBase {
createTable(identifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
FileStoreTable fileStoreTable = (FileStoreTable)
catalog.getTable(identifier);
assertEquals(true,
fileStoreTable.fileIO().exists(fileStoreTable.location()));
+
+ RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
+ RESTToken fileDataToken = fileIO.validToken();
+ RESTToken serverDataToken =
+
restCatalogServer.dataTokenStore.get(identifier.getFullName());
+ assertEquals(serverDataToken, fileDataToken);
}
}
@Test
- void testSnapshotFromREST() throws Catalog.TableNotExistException {
+ void testRefreshFileIOWhenExpired() throws Exception {
+ this.catalog = initDataTokenCatalog();
+ Identifier identifier =
+ Identifier.create("test_data_token",
"table_for_testing_date_token");
+ RESTToken expiredDataToken =
+ new RESTToken(
+ ImmutableMap.of("akId", "akId", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis());
+ restCatalogServer.setDataToken(identifier, expiredDataToken);
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+ FileStoreTable fileStoreTable = (FileStoreTable)
catalog.getTable(identifier);
+ RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
+ RESTToken fileDataToken = fileIO.validToken();
+ assertEquals(expiredDataToken, fileDataToken);
+ RESTToken newDataToken =
+ new RESTToken(
+ ImmutableMap.of("akId", "akId", "akSecret",
UUID.randomUUID().toString()),
+ System.currentTimeMillis() + 100_000);
+ restCatalogServer.setDataToken(identifier, newDataToken);
+ RESTToken nextFileDataToken = fileIO.validToken();
+ assertEquals(newDataToken, nextFileDataToken);
+ assertEquals(true, nextFileDataToken.expireAtMillis() -
fileDataToken.expireAtMillis() > 0);
+ }
+
+ @Test
+ void testSnapshotFromREST() throws Exception {
Options options = new Options();
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
- Identifier hasSnapshotTable = Identifier.create("test_db_a",
"my_snapshot_table");
+ Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a",
"my_snapshot_table");
+ createTable(hasSnapshotTableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
long id = 10086;
long millis = System.currentTimeMillis();
- restCatalogServer.setTableSnapshot(hasSnapshotTable,
createSnapshotWithMillis(id, millis));
- Optional<Snapshot> snapshot = catalog.loadSnapshot(hasSnapshotTable);
+ restCatalogServer.setTableSnapshot(
+ hasSnapshotTableIdentifier, createSnapshotWithMillis(id,
millis));
+ Optional<Snapshot> snapshot =
catalog.loadSnapshot(hasSnapshotTableIdentifier);
assertThat(snapshot).isPresent();
assertThat(snapshot.get().id()).isEqualTo(id);
assertThat(snapshot.get().timeMillis()).isEqualTo(millis);
-
- snapshot = catalog.loadSnapshot(Identifier.create("test_db_a",
"unknown"));
+ Identifier noSnapshotTableIdentifier =
Identifier.create("test_db_a_1", "unknown");
+ createTable(noSnapshotTableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
+ snapshot = catalog.loadSnapshot(noSnapshotTableIdentifier);
assertThat(snapshot).isEmpty();
}
+ @Test
+ public void testBatchRecordsWrite() throws Exception {
+
+ Identifier tableIdentifier = Identifier.create("my_db", "my_table");
+ createTable(tableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
+ FileStoreTable tableTestWrite = (FileStoreTable)
catalog.getTable(tableIdentifier);
+
+ // write
+ BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ GenericRow record1 = GenericRow.of(12);
+ GenericRow record2 = GenericRow.of(5);
+ GenericRow record3 = GenericRow.of(18);
+ write.write(record1);
+ write.write(record2);
+ write.write(record3);
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write.close();
+ commit.close();
+
+ // read
+ ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+ List<String> actual = new ArrayList<>();
+ reader.forEachRemaining(
+ row -> {
+ String rowStr =
+ String.format("%s[%d]",
row.getRowKind().shortString(), row.getInt(0));
+ actual.add(rowStr);
+ });
+
+ assertThat(actual).containsExactlyInAnyOrder("+I[5]", "+I[12]",
"+I[18]");
+ }
+
@Test
void testBranches() throws Exception {
String databaseName = "testBranchTable";
@@ -199,6 +278,11 @@ class RESTCatalogTest extends CatalogTestBase {
return true;
}
+ // TODO implement this
+ @Override
+ @Test
+ public void testTableUUID() {}
+
private void createTable(
Identifier identifier, Map<String, String> options, List<String>
partitionKeys)
throws Exception {
@@ -214,8 +298,12 @@ class RESTCatalogTest extends CatalogTestBase {
true);
}
- // TODO implement this
- @Override
- @Test
- public void testTableUUID() {}
+ private Catalog initDataTokenCatalog() {
+ Options options = new Options();
+ options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+ options.set(RESTCatalogOptions.TOKEN, initToken);
+ options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
+ options.set(RESTCatalogOptions.TOKEN_PROVIDER,
AuthProviderEnum.BEAR.identifier());
+ return new RESTCatalog(CatalogContext.create(options));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index dc202ec872..cf66ee425b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -105,6 +105,16 @@ class RESTCatalogITCase extends CatalogITCaseBase {
return options;
}
+ @Test
+ public void testWriteAndRead() {
+ batchSql(
+ String.format(
+ "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+ DATABASE_NAME, TABLE_NAME));
+ assertThat(batchSql(String.format("SELECT * FROM %s.%s",
DATABASE_NAME, TABLE_NAME)))
+ .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2",
22.0D));
+ }
+
@Override
protected String getTempDirPath() {
return this.warehouse;
diff --git a/paimon-spark/paimon-spark-ut/pom.xml
b/paimon-spark/paimon-spark-ut/pom.xml
index 552eb8474f..78127bd46f 100644
--- a/paimon-spark/paimon-spark-ut/pom.xml
+++ b/paimon-spark/paimon-spark-ut/pom.xml
@@ -157,6 +157,13 @@ under the License.
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
new file mode 100644
index 0000000000..cd2acdbe86
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for spark read from Rest catalog. */
+public class SparkCatalogWithRestTest {
+
+ private RESTCatalogServer restCatalogServer;
+ private String serverUrl;
+ private String warehouse;
+ private String initToken = "init_token";
+ @TempDir java.nio.file.Path tempFile;
+
+ @BeforeEach
+ public void before() throws IOException {
+ warehouse = tempFile.toUri().toString();
+ restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+ restCatalogServer.start();
+ serverUrl = restCatalogServer.getUrl();
+ }
+
+ @AfterEach()
+ public void after() throws Exception {
+ restCatalogServer.shutdown();
+ }
+
+ @Test
+ public void testTable() {
+ SparkSession spark =
+ SparkSession.builder()
+ .config("spark.sql.catalog.paimon",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.paimon.metastore", "rest")
+ .config("spark.sql.catalog.paimon.uri", serverUrl)
+ .config("spark.sql.catalog.paimon.token", initToken)
+ .config(
+ "spark.sql.catalog.paimon.token.provider",
+ AuthProviderEnum.BEAR.identifier())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark.sql("CREATE DATABASE paimon.db2");
+ spark.sql("USE paimon.db2");
+ spark.sql(
+ "CREATE TABLE t1 (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'bucket'='4',
'file.format'='avro')");
+ assertThat(
+ spark.sql("SHOW TABLES").collectAsList().stream()
+ .map(s -> s.get(1))
+ .map(Object::toString))
+ .containsExactlyInAnyOrder("t1");
+ spark.sql("DROP TABLE t1");
+ assertThat(spark.sql("SHOW TABLES").collectAsList().size() == 0);
+ spark.close();
+ }
+}