This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch revert-1938-pagination-alt-flex in repository https://gitbox.apache.org/repos/asf/polaris.git
commit 276e4183301009c111cad9182d8daccc700dafb2 Author: Eric Maynard <emayn...@apache.org> AuthorDate: Wed Jul 16 10:03:29 2025 -0700 Revert "Extensible pagination token implementation (#1938)" This reverts commit fb418a2613715c219620f69fa4e9b7a1827898be. --- .../apache/polaris/service/it/env/CatalogApi.java | 32 --- .../it/test/PolarisRestCatalogIntegrationBase.java | 73 +----- .../PolarisEclipseLinkMetaStoreSessionImpl.java | 8 +- .../impl/eclipselink/PolarisEclipseLinkStore.java | 18 +- .../relational/jdbc/JdbcBasePersistenceImpl.java | 39 +-- .../relational/jdbc/QueryGenerator.java | 54 +---- .../relational/jdbc/models/ModelEntity.java | 2 - .../relational/jdbc/QueryGeneratorTest.java | 19 +- polaris-core/build.gradle.kts | 8 - .../core/catalog/PolarisCatalogHelpers.java | 27 +-- .../polaris/core/config/FeatureConfiguration.java | 1 - .../AtomicOperationMetaStoreManager.java | 86 ++++--- .../persistence/dao/entity/EntitiesResult.java | 37 ++- .../persistence/dao/entity/ListEntitiesResult.java | 38 ++- .../persistence/pagination/DonePageToken.java} | 37 +-- .../core/persistence/pagination/EntityIdToken.java | 66 ----- .../core/persistence/pagination/HasPageSize.java} | 38 +-- .../persistence/pagination/LimitPageToken.java} | 45 ++-- .../polaris/core/persistence/pagination/Page.java | 90 +------ .../core/persistence/pagination/PageToken.java | 119 ++++----- .../core/persistence/pagination/PageTokenUtil.java | 267 --------------------- .../pagination/ReadEverythingPageToken.java} | 37 +-- .../polaris/core/persistence/pagination/Token.java | 101 -------- .../TransactionalMetaStoreManagerImpl.java | 102 ++++---- .../TreeMapTransactionalPersistenceImpl.java | 25 +- ...ris.core.persistence.pagination.Token$TokenType | 20 -- .../core/persistence/pagination/PageTokenTest.java | 168 ------------- ...ris.core.persistence.pagination.Token$TokenType | 20 -- .../quarkus/catalog/IcebergCatalogTest.java | 130 ---------- .../service/catalog/iceberg/IcebergCatalog.java | 59 +++-- .../catalog/iceberg/IcebergCatalogHandler.java | 22 +- .../persistence/pagination/PageTokenTest.java | 50 ++++ 32 files changed, 428 insertions(+), 1410 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java index 25a77ecbc..eb07d5502 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java @@ -101,24 +101,6 @@ public class CatalogApi extends RestApi { } } - public ListNamespacesResponse listNamespaces( - String catalog, Namespace parent, String pageToken, String pageSize) { - Map<String, String> queryParams = new HashMap<>(); - if (!parent.isEmpty()) { - // TODO change this for Iceberg 1.7.2: - // queryParams.put("parent", RESTUtil.encodeNamespace(parent)); - queryParams.put("parent", Joiner.on('\u001f').join(parent.levels())); - } - queryParams.put("pageToken", pageToken); - queryParams.put("pageSize", pageSize); - try (Response response = - request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) { - assertThat(response.getStatus()).isEqualTo(OK.getStatusCode()); - ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class); - return res; - } - } - public List<Namespace> listAllNamespacesChildFirst(String catalog) { List<Namespace> result = new ArrayList<>(); for (int idx = -1; idx < result.size(); idx++) { @@ -160,20 +142,6 @@ public class CatalogApi extends RestApi { } } - public ListTablesResponse listTables( - String catalog, Namespace namespace, String pageToken, String pageSize) { - String ns = RESTUtil.encodeNamespace(namespace); - Map<String, String> queryParams = new HashMap<>(); - queryParams.put("pageToken", pageToken); - queryParams.put("pageSize", pageSize); - try (Response res = - request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams) - .get()) { - assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode()); - return res.readEntity(ListTablesResponse.class); - } - } - public void dropTable(String catalog, TableIdentifier id) { String ns = RESTUtil.encodeNamespace(id.namespace()); try (Response res = diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java index 2bf55cd7e..8ebee36f0 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationBase.java @@ -69,8 +69,6 @@ import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; -import org.apache.iceberg.rest.responses.ListNamespacesResponse; -import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.types.Types; import org.apache.polaris.core.admin.model.Catalog; @@ -163,8 +161,7 @@ public abstract class PolarisRestCatalogIntegrationBase extends CatalogTests<RES private static final String[] DEFAULT_CATALOG_PROPERTIES = { "polaris.config.allow.unstructured.table.location", "true", - "polaris.config.allow.external.table.location", "true", - "polaris.config.list-pagination-enabled", "true" + "polaris.config.allow.external.table.location", "true" }; @Retention(RetentionPolicy.RUNTIME) @@ -2026,72 +2023,4 @@ public abstract class PolarisRestCatalogIntegrationBase extends CatalogTests<RES assertThat(currentETag).isEqualTo(afterDMLETag); // Should match post-DML ETag } } - - @Test - public void testPaginatedListNamespaces() { - String prefix = "testPaginatedListNamespaces"; - for (int i = 0; i < 20; i++) { - Namespace namespace = Namespace.of(prefix + i); - restCatalog.createNamespace(namespace); - } - - try { - Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty())) - .hasSize(20); - for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { - int total = 0; - String pageToken = null; - do { - ListNamespacesResponse response = - catalogApi.listNamespaces( - currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize)); - Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize); - total += response.namespaces().size(); - pageToken = response.nextPageToken(); - } while (pageToken != null); - Assertions.assertThat(total) - .as("Total paginated results for pageSize = " + pageSize) - .isEqualTo(20); - } - } finally { - for (int i = 0; i < 20; i++) { - Namespace namespace = Namespace.of(prefix + i); - restCatalog.dropNamespace(namespace); - } - } - } - - @Test - public void testPaginatedListTables() { - String prefix = "testPaginatedListTables"; - Namespace namespace = Namespace.of(prefix); - restCatalog.createNamespace(namespace); - for (int i = 0; i < 20; i++) { - restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA); - } - - try { - Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20); - for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) { - int total = 0; - String pageToken = null; - do { - ListTablesResponse response = - catalogApi.listTables( - currentCatalogName, namespace, pageToken, String.valueOf(pageSize)); - Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize); - total += response.identifiers().size(); - pageToken = response.nextPageToken(); - } while (pageToken != null); - Assertions.assertThat(total) - .as("Total paginated results for pageSize = " + pageSize) - .isEqualTo(20); - } - } finally { - for (int i = 0; i < 20; i++) { - restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i)); - } - restCatalog.dropNamespace(namespace); - } - } } diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index 0981275fd..b7b0a951e 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -56,7 +56,7 @@ import org.apache.polaris.core.exceptions.AlreadyExistsException; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; -import org.apache.polaris.core.persistence.pagination.EntityIdToken; +import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence; @@ -480,7 +480,11 @@ public class PolarisEclipseLinkMetaStoreSessionImpl extends AbstractTransactiona .map(ModelEntity::toEntity) .filter(entityFilter); - return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity); + if (pageToken instanceof HasPageSize hasPageSize) { + data = data.limit(hasPageSize.getPageSize()); + } + + return Page.fromItems(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ diff --git a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java index 60251a7b6..4a889d3c0 100644 --- a/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java +++ b/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java @@ -35,7 +35,6 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; -import org.apache.polaris.core.persistence.pagination.EntityIdToken; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; @@ -295,17 +294,7 @@ public class PolarisEclipseLinkStore { // Currently check against ENTITIES not joining with ENTITIES_ACTIVE String hql = - "SELECT m from ModelEntity m where" - + " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; - - var entityIdToken = pageToken.valueAs(EntityIdToken.class); - if (entityIdToken.isPresent()) { - hql += " and m.id > :tokenId"; - } - - if (pageToken.paginationRequested()) { - hql += " order by m.id asc"; - } + "SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode"; TypedQuery<ModelEntity> query = session @@ -314,11 +303,6 @@ public class PolarisEclipseLinkStore { .setParameter("parentId", parentId) .setParameter("typeCode", entityType.getCode()); - if (entityIdToken.isPresent()) { - long tokenId = entityIdToken.get().entityId(); - query = query.setParameter("tokenId", tokenId); - } - return query.getResultList(); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index c93765c84..5c3dd1dba 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -32,7 +32,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -54,7 +53,7 @@ import org.apache.polaris.core.persistence.IntegrationPersistence; import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; -import org.apache.polaris.core.persistence.pagination.EntityIdToken; +import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -460,7 +459,7 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers @Nonnull Predicate<PolarisBaseEntity> entityFilter, @Nonnull Function<PolarisBaseEntity, T> transformer, @Nonnull PageToken pageToken) { - Map<String, Object> whereEquals = + Map<String, Object> params = Map.of( "catalog_id", catalogId, @@ -470,41 +469,29 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers entityType.getCode(), "realm_id", realmId); - Map<String, Object> whereGreater; // Limit can't be pushed down, due to client side filtering // absence of transaction. - String orderByColumnName = null; - if (pageToken.paginationRequested()) { - orderByColumnName = ModelEntity.ID_COLUMN; - whereGreater = - pageToken - .valueAs(EntityIdToken.class) - .map( - entityIdToken -> - Map.<String, Object>of(ModelEntity.ID_COLUMN, entityIdToken.entityId())) - .orElse(Map.of()); - } else { - whereGreater = Map.of(); - } - try { PreparedQuery query = QueryGenerator.generateSelectQuery( - ModelEntity.ALL_COLUMNS, - ModelEntity.TABLE_NAME, - whereEquals, - whereGreater, - orderByColumnName); - AtomicReference<Page<T>> results = new AtomicReference<>(); + ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params); + List<PolarisBaseEntity> results = new ArrayList<>(); datasourceOperations.executeSelectOverStream( query, new ModelEntity(), stream -> { var data = stream.filter(entityFilter); - results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity)); + if (pageToken instanceof HasPageSize hasPageSize) { + data = data.limit(hasPageSize.getPageSize()); + } + data.forEach(results::add); }); - return results.get(); + List<T> resultsOrEmpty = + results == null + ? Collections.emptyList() + : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); + return Page.fromItems(resultsOrEmpty); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java index a06bf283a..c6bad0a1c 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java @@ -20,7 +20,6 @@ package org.apache.polaris.persistence.relational.jdbc; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; -import jakarta.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -60,27 +59,8 @@ public class QueryGenerator { @Nonnull List<String> projections, @Nonnull String tableName, @Nonnull Map<String, Object> whereClause) { - return generateSelectQuery(projections, tableName, whereClause, Map.of(), null); - } - - /** - * Generates a SELECT query with projection and filtering. - * - * @param projections List of columns to retrieve. - * @param tableName Target table name. - * @param whereEquals Column-value pairs used in WHERE filtering. - * @return A parameterized SELECT query. - * @throws IllegalArgumentException if any whereClause column isn't in projections. - */ - public static PreparedQuery generateSelectQuery( - @Nonnull List<String> projections, - @Nonnull String tableName, - @Nonnull Map<String, Object> whereEquals, - @Nonnull Map<String, Object> whereGreater, - @Nullable String orderByColumn) { - QueryFragment where = - generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater); - PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn); + QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause); + PreparedQuery query = generateSelectQuery(projections, tableName, where.sql()); return new PreparedQuery(query.sql(), where.parameters()); } @@ -128,8 +108,7 @@ public class QueryGenerator { params.add(realmId); String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?"; return new PreparedQuery( - generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(), - params); + generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params); } /** @@ -178,7 +157,7 @@ public class QueryGenerator { @Nonnull List<Object> values, @Nonnull Map<String, Object> whereClause) { List<Object> bindingParams = new ArrayList<>(values); - QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of()); + QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause); String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", ")); String sql = "UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql(); @@ -198,49 +177,34 @@ public class QueryGenerator { @Nonnull List<String> tableColumns, @Nonnull String tableName, @Nonnull Map<String, Object> whereClause) { - QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of()); + QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause); return new PreparedQuery( "DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters()); } private static PreparedQuery generateSelectQuery( - @Nonnull List<String> columnNames, - @Nonnull String tableName, - @Nonnull String filter, - @Nullable String orderByColumn) { + @Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull String filter) { String sql = "SELECT " + String.join(", ", columnNames) + " FROM " + getFullyQualifiedTableName(tableName) + filter; - if (orderByColumn != null) { - sql += " ORDER BY " + orderByColumn + " ASC"; - } return new PreparedQuery(sql, Collections.emptyList()); } @VisibleForTesting static QueryFragment generateWhereClause( - @Nonnull Set<String> tableColumns, - @Nonnull Map<String, Object> whereEquals, - @Nonnull Map<String, Object> whereGreater) { + @Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> whereClause) { List<String> conditions = new ArrayList<>(); List<Object> parameters = new ArrayList<>(); - for (Map.Entry<String, Object> entry : whereEquals.entrySet()) { + for (Map.Entry<String, Object> entry : whereClause.entrySet()) { if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); } conditions.add(entry.getKey() + " = ?"); parameters.add(entry.getValue()); } - for (Map.Entry<String, Object> entry : whereGreater.entrySet()) { - if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) { - throw new IllegalArgumentException("Invalid query column: " + entry.getKey()); - } - conditions.add(entry.getKey() + " > ?"); - parameters.add(entry.getValue()); - } String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions); return new QueryFragment(clause, parameters); } @@ -294,7 +258,7 @@ public class QueryGenerator { QueryFragment where = new QueryFragment(clause, finalParams); PreparedQuery query = - generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql(), null); + generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where.sql()); return new PreparedQuery(query.sql(), where.parameters()); } diff --git a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java index 6eaec072d..e9a2bdb55 100644 --- a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java +++ b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/models/ModelEntity.java @@ -33,8 +33,6 @@ import org.apache.polaris.persistence.relational.jdbc.DatabaseType; public class ModelEntity implements Converter<PolarisBaseEntity> { public static final String TABLE_NAME = "ENTITIES"; - public static final String ID_COLUMN = "id"; - public static final List<String> ALL_COLUMNS = List.of( "id", diff --git a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java index 6df78eff5..798dd92e7 100644 --- a/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java +++ b/persistence/relational-jdbc/src/test/java/org/apache/polaris/persistence/relational/jdbc/QueryGeneratorTest.java @@ -184,8 +184,7 @@ public class QueryGeneratorTest { Map<String, Object> whereClause = new HashMap<>(); whereClause.put("name", "test"); assertEquals( - " WHERE name = ?", - QueryGenerator.generateWhereClause(Set.of("name"), whereClause, Map.of()).sql()); + " WHERE name = ?", QueryGenerator.generateWhereClause(Set.of("name"), whereClause).sql()); } @Test @@ -195,25 +194,13 @@ public class QueryGeneratorTest { whereClause.put("version", 1); assertEquals( " WHERE name = ? AND version = ?", - QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause, Map.of()).sql()); - } - - @Test - void testGenerateWhereClause_multipleConditions_AndInequality() { - Map<String, Object> whereClause = new HashMap<>(); - whereClause.put("name", "test"); - whereClause.put("version", 1); - assertEquals( - " WHERE name = ? AND version = ? AND id > ?", - QueryGenerator.generateWhereClause( - Set.of("name", "version", "id"), whereClause, Map.of("id", 123)) - .sql()); + QueryGenerator.generateWhereClause(Set.of("name", "version"), whereClause).sql()); } @Test void testGenerateWhereClause_emptyMap() { Map<String, Object> whereClause = Collections.emptyMap(); - assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause, Map.of()).sql()); + assertEquals("", QueryGenerator.generateWhereClause(Set.of(), whereClause).sql()); } @Test diff --git a/polaris-core/build.gradle.kts b/polaris-core/build.gradle.kts index 5f8b1ce3e..ca24aeecd 100644 --- a/polaris-core/build.gradle.kts +++ b/polaris-core/build.gradle.kts @@ -36,11 +36,6 @@ dependencies { implementation("com.fasterxml.jackson.core:jackson-annotations") implementation("com.fasterxml.jackson.core:jackson-core") implementation("com.fasterxml.jackson.core:jackson-databind") - implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile") - runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-guava") - runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jdk8") - runtimeOnly("com.fasterxml.jackson.datatype:jackson-datatype-jsr310") - implementation(libs.caffeine) implementation(libs.commons.lang3) implementation(libs.commons.codec1) @@ -101,9 +96,6 @@ dependencies { implementation(platform(libs.google.cloud.storage.bom)) implementation("com.google.cloud:google-cloud-storage") - testCompileOnly(project(":polaris-immutables")) - testAnnotationProcessor(project(":polaris-immutables", configuration = "processor")) - testFixturesApi("com.fasterxml.jackson.core:jackson-core") testFixturesApi("com.fasterxml.jackson.core:jackson-databind") testFixturesApi(libs.commons.lang3) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java index b37efaae3..e10c24f2f 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java @@ -19,6 +19,7 @@ package org.apache.polaris.core.catalog; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -61,28 +62,20 @@ public class PolarisCatalogHelpers { return Namespace.of(parentLevels); } - public static Namespace nameAndIdToNamespace( - List<PolarisEntity> catalogPath, PolarisEntity.NameAndId entity) { - // Skip element 0 which is the catalog entity - String[] fullName = new String[catalogPath.size()]; - for (int i = 0; i < fullName.length - 1; ++i) { - fullName[i] = catalogPath.get(i + 1).getName(); - } - fullName[fullName.length - 1] = entity.getName(); - return Namespace.of(fullName); - } - - /** - * Given the shortnames/ids of entities that all live under the given catalogPath, reconstructs - * TableIdentifier objects for each that all hold the catalogPath excluding the catalog entity. - */ - public static Namespace parentNamespace(List<PolarisEntity> catalogPath) { + public static List<Namespace> nameAndIdToNamespaces( + List<PolarisEntity> catalogPath, List<PolarisEntity.NameAndId> entities) { // Skip element 0 which is the catalog entity String[] parentNamespaces = new String[catalogPath.size() - 1]; for (int i = 0; i < parentNamespaces.length; ++i) { parentNamespaces[i] = catalogPath.get(i + 1).getName(); } - return Namespace.of(parentNamespaces); + List<Namespace> namespaces = new ArrayList<>(); + for (PolarisEntity.NameAndId entity : entities) { + String[] fullName = Arrays.copyOf(parentNamespaces, parentNamespaces.length + 1); + fullName[fullName.length - 1] = entity.getName(); + namespaces.add(Namespace.of(fullName)); + } + return namespaces; } /** diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 05b7e0dec..2f69e898e 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -222,7 +222,6 @@ public class FeatureConfiguration<T> extends PolarisConfiguration<T> { public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED = PolarisConfiguration.<Boolean>builder() .key("LIST_PAGINATION_ENABLED") - .catalogConfig("polaris.config.list-pagination-enabled") .description("If set to true, pagination for APIs like listTables is enabled.") .defaultValue(false) .buildFeatureConfiguration(); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index e4e77c155..08bda144c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -31,7 +31,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -699,20 +698,23 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { // return list of active entities // TODO: Clean up shared logic for catalogId/parentId - long catalogId = catalogPath == null || catalogPath.isEmpty() ? 0L : catalogPath.get(0).getId(); + long catalogId = + catalogPath == null || catalogPath.size() == 0 ? 0l : catalogPath.get(0).getId(); long parentId = - catalogPath == null || catalogPath.isEmpty() - ? 0L + catalogPath == null || catalogPath.size() == 0 + ? 0l : catalogPath.get(catalogPath.size() - 1).getId(); + Page<EntityNameLookupRecord> resultPage = + ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken); // prune the returned list with only entities matching the entity subtype - Predicate<PolarisBaseEntity> filter = - entitySubType != PolarisEntitySubType.ANY_SUBTYPE - ? e -> e.getSubTypeCode() == entitySubType.getCode() - : entity -> true; - - Page<EntityNameLookupRecord> resultPage = - ms.listEntities(callCtx, catalogId, parentId, entityType, filter, pageToken); + if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { + resultPage = + pageToken.buildNextPage( + resultPage.items.stream() + .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) + .collect(Collectors.toList())); + } // TODO: Use post-validation to enforce consistent view against catalogPath. In the // meantime, happens-before ordering semantics aren't guaranteed during high-concurrency @@ -955,7 +957,7 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { e.getExistingEntity().getSubTypeCode())); } - return new EntitiesResult(Page.fromItems(createdEntities)); + return new EntitiesResult(createdEntities); } /** {@inheritDoc} */ @@ -1018,7 +1020,7 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { } // good, all success - return new EntitiesResult(Page.fromItems(updatedEntities)); + return new EntitiesResult(updatedEntities); } /** {@inheritDoc} */ @@ -1183,7 +1185,7 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items(); + .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1517,38 +1519,32 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { Function.identity(), pageToken); + List<PolarisBaseEntity> loadedTasks = new ArrayList<>(); final AtomicInteger failedLeaseCount = new AtomicInteger(0); - List<PolarisBaseEntity> loadedTasks = - availableTasks.items().stream() - .map( - task -> { - PolarisBaseEntity.Builder updatedTaskBuilder = - new PolarisBaseEntity.Builder(task); - Map<String, String> properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt( - properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTaskBuilder.properties( - PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = - updateEntityPropertiesIfNotChanged(callCtx, null, updatedTaskBuilder.build()); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - return result.getEntity(); - } else { - failedLeaseCount.getAndIncrement(); - return null; - } - }) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + availableTasks.items.forEach( + task -> { + PolarisBaseEntity.Builder updatedTaskBuilder = new PolarisBaseEntity.Builder(task); + Map<String, String> properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTaskBuilder.properties( + PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = + updateEntityPropertiesIfNotChanged(callCtx, null, updatedTaskBuilder.build()); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + loadedTasks.add(result.getEntity()); + } else { + failedLeaseCount.getAndIncrement(); + } + }); // Since the contract of this method is to only return an empty list once no available tasks // are found anymore, if we happen to fail to lease any tasks at all due to all of them diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java index 13c7422f0..e27b69680 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java @@ -18,20 +18,25 @@ */ package org.apache.polaris.core.persistence.dao.entity; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; +import java.util.Optional; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; /** a set of returned entities result */ public class EntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final Page<PolarisBaseEntity> entities; + private final List<PolarisBaseEntity> entities; + private final Optional<PageToken> pageTokenOpt; public static EntitiesResult fromPage(Page<PolarisBaseEntity> page) { - return new EntitiesResult(page); + return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken)); } /** @@ -43,6 +48,11 @@ public class EntitiesResult extends BaseResult { public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String extraInformation) { super(errorStatus, extraInformation); this.entities = null; + this.pageTokenOpt = Optional.empty(); + } + + public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) { + this(entities, Optional.empty()); } /** @@ -50,12 +60,29 @@ public class EntitiesResult extends BaseResult { * * @param entities list of entities being returned, implies success */ - public EntitiesResult(@Nonnull Page<PolarisBaseEntity> entities) { + public EntitiesResult( + @Nonnull List<PolarisBaseEntity> entities, @Nonnull Optional<PageToken> pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; + } + + @JsonCreator + private EntitiesResult( + @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, + @JsonProperty("extraInformation") String extraInformation, + @JsonProperty("entities") List<PolarisBaseEntity> entities, + @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) { + super(returnStatus, extraInformation); + this.entities = entities; + this.pageTokenOpt = pageTokenOpt; + } + + public List<PolarisBaseEntity> getEntities() { + return entities; } - public @Nullable List<PolarisBaseEntity> getEntities() { - return entities == null ? null : entities.items(); + public Optional<PageToken> getPageToken() { + return pageTokenOpt; } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java index a7a51d229..10669e899 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java @@ -18,21 +18,26 @@ */ package org.apache.polaris.core.persistence.dao.entity; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; +import java.util.Optional; import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; /** the return the result for a list entities call */ public class ListEntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned - private final Page<EntityNameLookupRecord> entities; + private final List<EntityNameLookupRecord> entities; + private final Optional<PageToken> pageTokenOpt; /** Create a {@link ListEntitiesResult} from a {@link Page} */ public static ListEntitiesResult fromPage(Page<EntityNameLookupRecord> page) { - return new ListEntitiesResult(page); + return new ListEntitiesResult(page.items, Optional.ofNullable(page.pageToken)); } /** @@ -41,9 +46,13 @@ public class ListEntitiesResult extends BaseResult { * @param errorCode error code, cannot be SUCCESS * @param extraInformation extra information */ - public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String extraInformation) { + public ListEntitiesResult( + @Nonnull ReturnStatus errorCode, + @Nullable String extraInformation, + @Nonnull Optional<PageToken> pageTokenOpt) { super(errorCode, extraInformation); this.entities = null; + this.pageTokenOpt = pageTokenOpt; } /** @@ -51,16 +60,29 @@ public class ListEntitiesResult extends BaseResult { * * @param entities list of entities being returned, implies success */ - public ListEntitiesResult(Page<EntityNameLookupRecord> entities) { + public ListEntitiesResult( + @Nonnull List<EntityNameLookupRecord> entities, @Nonnull Optional<PageToken> pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } - public @Nullable List<EntityNameLookupRecord> getEntities() { - return entities == null ? null : entities.items(); + @JsonCreator + private ListEntitiesResult( + @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, + @JsonProperty("extraInformation") String extraInformation, + @JsonProperty("entities") List<EntityNameLookupRecord> entities, + @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) { + super(returnStatus, extraInformation); + this.entities = entities; + this.pageTokenOpt = pageTokenOpt; + } + + public List<EntityNameLookupRecord> getEntities() { + return entities; } - public Page<EntityNameLookupRecord> getPage() { - return entities == null ? Page.fromItems(List.of()) : entities; + public Optional<PageToken> getPageToken() { + return pageTokenOpt; } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java similarity index 54% copy from polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java copy to polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java index 5053ac3ce..d46ea7b02 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java @@ -18,36 +18,23 @@ */ package org.apache.polaris.core.persistence.pagination; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.polaris.immutables.PolarisImmutable; +import java.util.List; -@PolarisImmutable -@JsonSerialize(as = ImmutableDummyTestToken.class) -@JsonDeserialize(as = ImmutableDummyTestToken.class) -public interface DummyTestToken extends Token { - String ID = "test-dummy"; - - Optional<String> s(); +/** + * A {@link PageToken} string that represents the lack of a page token. Returns `null` in + * `toTokenString`, which the client will interpret as there being no more data available. + */ +public class DonePageToken extends PageToken { - OptionalInt i(); + public DonePageToken() {} @Override - default String getT() { - return ID; + public String toTokenString() { + return null; } - final class DummyTestTokenType implements TokenType { - @Override - public String id() { - return ID; - } - - @Override - public Class<? extends Token> javaType() { - return DummyTestToken.class; - } + @Override + protected PageToken updated(List<?> newData) { + throw new IllegalStateException("DonePageToken.updated is invalid"); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java deleted file mode 100644 index 8a9a03b1b..000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/EntityIdToken.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.core.persistence.pagination; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import jakarta.annotation.Nullable; -import org.apache.polaris.core.entity.PolarisBaseEntity; -import org.apache.polaris.immutables.PolarisImmutable; - -/** Pagination {@linkplain Token token} backed by {@link PolarisBaseEntity#getId() entity ID}. */ -@PolarisImmutable -@JsonSerialize(as = ImmutableEntityIdToken.class) -@JsonDeserialize(as = ImmutableEntityIdToken.class) -public interface EntityIdToken extends Token { - String ID = "e"; - - @JsonProperty("i") - long entityId(); - - @Override - default String getT() { - return ID; - } - - static @Nullable EntityIdToken fromEntity(PolarisBaseEntity entity) { - if (entity == null) { - return null; - } - return fromEntityId(entity.getId()); - } - - static EntityIdToken fromEntityId(long entityId) { - return ImmutableEntityIdToken.builder().entityId(entityId).build(); - } - - final class EntityIdTokenType implements TokenType { - @Override - public String id() { - return ID; - } - - @Override - public Class<? extends Token> javaType() { - return EntityIdToken.class; - } - } -} diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java similarity index 52% copy from polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java copy to polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java index 5053ac3ce..c6b216fcd 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java @@ -18,36 +18,10 @@ */ package org.apache.polaris.core.persistence.pagination; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.polaris.immutables.PolarisImmutable; - -@PolarisImmutable -@JsonSerialize(as = ImmutableDummyTestToken.class) -@JsonDeserialize(as = ImmutableDummyTestToken.class) -public interface DummyTestToken extends Token { - String ID = "test-dummy"; - - Optional<String> s(); - - OptionalInt i(); - - @Override - default String getT() { - return ID; - } - - final class DummyTestTokenType implements TokenType { - @Override - public String id() { - return ID; - } - - @Override - public Class<? extends Token> javaType() { - return DummyTestToken.class; - } - } +/** + * A light interface for {@link PageToken} implementations to express that they have a page size + * that should be respected + */ +public interface HasPageSize { + int getPageSize(); } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java similarity index 54% copy from polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java copy to polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java index 5053ac3ce..18586446c 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java @@ -18,36 +18,35 @@ */ package org.apache.polaris.core.persistence.pagination; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.polaris.immutables.PolarisImmutable; +import java.util.List; -@PolarisImmutable -@JsonSerialize(as = ImmutableDummyTestToken.class) -@JsonDeserialize(as = ImmutableDummyTestToken.class) -public interface DummyTestToken extends Token { - String ID = "test-dummy"; +/** + * A {@link PageToken} implementation that has a page size, but no start offset. This can be used to + * represent a `limit`. When updated, it returns {@link DonePageToken}. As such it should never be + * user-facing and doesn't truly paginate. + */ +public class LimitPageToken extends PageToken implements HasPageSize { + + public static final String PREFIX = "limit"; - Optional<String> s(); + private final int pageSize; - OptionalInt i(); + public LimitPageToken(int pageSize) { + this.pageSize = pageSize; + } @Override - default String getT() { - return ID; + public int getPageSize() { + return pageSize; } - final class DummyTestTokenType implements TokenType { - @Override - public String id() { - return ID; - } + @Override + public String toTokenString() { + return String.format("%s/%d", PREFIX, pageSize); + } - @Override - public Class<? extends Token> javaType() { - return DummyTestToken.class; - } + @Override + protected PageToken updated(List<?> newData) { + return new DonePageToken(); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java index 4a3de4d12..18287f85c 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java @@ -18,99 +18,25 @@ */ package org.apache.polaris.core.persistence.pagination; -import static java.util.Spliterators.iterator; - -import jakarta.annotation.Nullable; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** - * An immutable page of items plus the next-page token value, if there are more items. The {@link - * #encodedResponseToken()} here can be used to continue the listing operation that generated the - * `items`. + * An immutable page of items plus their paging cursor. The {@link PageToken} here can be used to + * continue the listing operation that generated the `items`. */ public class Page<T> { - private final PageToken request; - private final List<T> items; - @Nullable private final Token nextToken; + public final PageToken pageToken; + public final List<T> items; - private Page(PageToken request, @Nullable Token nextToken, List<T> items) { - this.request = request; - this.nextToken = nextToken; + public Page(PageToken pageToken, List<T> items) { + this.pageToken = pageToken; this.items = items; } /** - * Builds a complete response page for the full list of relevant items. No subsequence pages of - * related data exist. + * Used to wrap a {@link List<T>} of items into a {@link Page <T>} when there are no more pages */ public static <T> Page<T> fromItems(List<T> items) { - return new Page<>(PageToken.readEverything(), null, items); - } - - /** - * Produces a response page by consuming the number of items from the provided stream according to - * the {@code request} parameter. Source items can be converted to a different type by providing a - * {@code mapper} function. The page token for the response will be produced from the request data - * combined with the pointer to the next page of data provided by the {@code dataPointer} - * function. - * - * @param request defines pagination parameters that were uses to produce this page of data. - * @param items stream of source data - * @param mapper converter from source data types to response data types. - * @param tokenBuilder determines the {@link Token} used to start the next page of data given the - * last item from the previous page. The output of this function will be available from {@link - * PageToken#value()} associated with the request for the next page. - */ - public static <R, T> Page<R> mapped( - PageToken request, Stream<T> items, Function<T, R> mapper, Function<T, Token> tokenBuilder) { - List<R> data; - T last = null; - if (!request.paginationRequested()) { - // short-cut for "no pagination" - data = items.map(mapper).collect(Collectors.toList()); - } else { - data = new ArrayList<>(request.pageSize().orElse(10)); - - Iterator<T> it = iterator(items.spliterator()); - int limit = request.pageSize().orElse(Integer.MAX_VALUE); - while (it.hasNext() && data.size() < limit) { - last = it.next(); - data.add(mapper.apply(last)); - } - - // Signal "no more data" if the number of items is less than the requested page size or if - // there is no more data. - if (data.size() < limit || !it.hasNext()) { - last = null; - } - } - - return new Page<>(request, tokenBuilder.apply(last), data); - } - - public List<T> items() { - return items; - } - - /** - * Returns a page token in encoded form suitable for returning to API clients. The string returned - * from this method is expected to be parsed by {@link PageToken#build(String, Integer)} when - * servicing the request for the next page of related data. - */ - public @Nullable String encodedResponseToken() { - return PageTokenUtil.encodePageToken(request, nextToken); - } - - /** - * Converts this page of data to objects of a different type, while maintaining the underlying - * pointer to the next page of source data. - */ - public <R> Page<R> map(Function<T, R> mapper) { - return new Page<>(request, nextToken, items.stream().map(mapper).collect(Collectors.toList())); + return new Page<>(new DonePageToken(), items); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java index a1dceffdd..2e335ccd4 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java @@ -18,75 +18,82 @@ */ package org.apache.polaris.core.persistence.pagination; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import jakarta.annotation.Nullable; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.polaris.immutables.PolarisImmutable; +import java.util.List; +import java.util.Objects; -/** A wrapper for pagination information passed in as part of a request. */ -@PolarisImmutable -@JsonSerialize(as = ImmutablePageToken.class) -@JsonDeserialize(as = ImmutablePageToken.class) -public interface PageToken { - // Serialization property names are intentionally short to reduce the size of the serialized - // paging token. +/** + * Represents a page token that can be used by operations like `listTables`. Clients that specify a + * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of + * which is a serialized PageToken. + * + * <p>By providing that in the next query's `pageToken`, the client can resume listing where they + * left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the + * response, that means there is no more data to read. + */ +public abstract class PageToken { - /** The requested page size (optional). */ - @JsonProperty("p") - OptionalInt pageSize(); + /** Build a new PageToken that reads everything */ + public static PageToken readEverything() { + return build(null, null); + } - /** Convenience for {@code pageSize().isPresent()}. */ - default boolean paginationRequested() { - return pageSize().isPresent(); + /** Build a new PageToken from an input String, without a specified page size */ + public static PageToken fromString(String token) { + return build(token, null); } - /** - * Paging token value, if present. Serialized paging tokens always have a value, but "synthetic" - * paging tokens like {@link #readEverything()} or {@link #fromLimit(int)} do not have a token - * value. - */ - @JsonProperty("v") - Optional<Token> value(); + /** Build a new PageToken from a limit */ + public static PageToken fromLimit(Integer pageSize) { + return build(null, pageSize); + } - // Note: another property can be added to contain a (cryptographic) signature, if we want to - // ensure that a paging-token hasn't been tampered. + /** Build a {@link PageToken} from the input string and page size */ + public static PageToken build(String token, Integer pageSize) { + if (token == null || token.isEmpty()) { + if (pageSize != null) { + return new LimitPageToken(pageSize); + } else { + return new ReadEverythingPageToken(); + } + } else { + // TODO implement, split out by the token's prefix + throw new IllegalArgumentException("Unrecognized page token: " + token); + } + } + + /** Serialize a {@link PageToken} into a string */ + public abstract String toTokenString(); /** - * Paging token value, if it is present and an instance of the given {@code type}. This is a - * convenience to prevent duplication of type casts. + * Builds a new page token to reflect new data that's been read. If the amount of data read is + * less than the pageSize, this will return a {@link DonePageToken} */ - default <T extends Token> Optional<T> valueAs(Class<T> type) { - return value() - .flatMap( - t -> - type.isAssignableFrom(t.getClass()) ? Optional.of(type.cast(t)) : Optional.empty()); - } + protected abstract PageToken updated(List<?> newData); - /** Represents a non-paginated request. */ - static PageToken readEverything() { - return PageTokenUtil.READ_EVERYTHING; + /** + * Builds a {@link Page <T>} from a {@link List<T>}. The {@link PageToken} attached to the new + * {@link Page <T>} is the same as the result of calling {@link #updated(List)} on this {@link + * PageToken}. + */ + public final <T> Page<T> buildNextPage(List<T> data) { + return new Page<T>(updated(data), data); } - /** Represents a request to start paginating with a particular page size. */ - static PageToken fromLimit(int limit) { - return PageTokenUtil.fromLimit(limit); + @Override + public final boolean equals(Object o) { + if (o instanceof PageToken) { + return Objects.equals(this.toTokenString(), ((PageToken) o).toTokenString()); + } else { + return false; + } } - /** - * Reconstructs a page token from the API-level page token string (returned to the client in the - * response to a previous request for similar data) and an API-level new requested page size. - * - * @param serializedPageToken page token from the {@link Page#encodedResponseToken() previous - * page} - * @param requestedPageSize optional page size for the next page. If not set, the page size of the - * previous page (encoded in the page token string) will be reused. - * @see Page#encodedResponseToken() - */ - static PageToken build( - @Nullable String serializedPageToken, @Nullable Integer requestedPageSize) { - return PageTokenUtil.decodePageRequest(serializedPageToken, requestedPageSize); + @Override + public final int hashCode() { + if (toTokenString() == null) { + return 0; + } else { + return toTokenString().hashCode(); + } } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java deleted file mode 100644 index 8a811f41c..000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageTokenUtil.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.core.persistence.pagination; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.String.format; -import static java.util.Collections.unmodifiableMap; - -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DatabindContext; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; -import com.fasterxml.jackson.dataformat.smile.databind.SmileMapper; -import com.google.common.annotations.VisibleForTesting; -import jakarta.annotation.Nullable; -import java.io.IOException; -import java.util.Base64; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.ServiceLoader; - -final class PageTokenUtil { - - private static final ObjectMapper SMILE_MAPPER = new SmileMapper().findAndRegisterModules(); - - /** Constant for {@link PageToken#readEverything()}. */ - static final PageToken READ_EVERYTHING = - new PageToken() { - @Override - public OptionalInt pageSize() { - return OptionalInt.empty(); - } - - @Override - public Optional<Token> value() { - return Optional.empty(); - } - - @Override - public int hashCode() { - return 1; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PageToken)) { - return false; - } - PageToken other = (PageToken) obj; - return other.pageSize().isEmpty() && other.value().isEmpty(); - } - - @Override - public String toString() { - return "PageToken(everything)"; - } - }; - - static PageToken fromLimit(int limit) { - return new PageToken() { - @Override - public OptionalInt pageSize() { - return OptionalInt.of(limit); - } - - @Override - public Optional<Token> value() { - return Optional.empty(); - } - - @Override - public int hashCode() { - return 2; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof PageToken)) { - return false; - } - PageToken other = (PageToken) obj; - return other.pageSize().equals(pageSize()) && other.value().isEmpty(); - } - - @Override - public String toString() { - return "PageToken(limit = " + limit + ")"; - } - }; - } - - private PageTokenUtil() {} - - /** - * Decodes a {@link PageToken} from API request parameters for the page-size and a serialized page - * token. - */ - static PageToken decodePageRequest( - @Nullable String requestedPageToken, @Nullable Integer requestedPageSize) { - if (requestedPageToken != null) { - var bytes = Base64.getUrlDecoder().decode(requestedPageToken); - try { - var pageToken = SMILE_MAPPER.readValue(bytes, PageToken.class); - if (requestedPageSize != null) { - int pageSizeInt = requestedPageSize; - checkArgument(pageSizeInt >= 0, "Invalid page size"); - if (pageToken.pageSize().orElse(-1) != pageSizeInt) { - pageToken = ImmutablePageToken.builder().from(pageToken).pageSize(pageSizeInt).build(); - } - } - return pageToken; - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (requestedPageSize != null) { - int pageSizeInt = requestedPageSize; - checkArgument(pageSizeInt >= 0, "Invalid page size"); - return fromLimit(pageSizeInt); - } else { - return READ_EVERYTHING; - } - } - - /** - * Returns the encoded ({@link String} serialized) {@link PageToken} built from the given {@link - * PageToken currentPageToken}, the page token of the current request, and {@link Token - * nextToken}, the token for the next page. - * - * @param currentPageToken page token of the currently handled API request, must not be {@code - * null} - * @param nextToken token for the next page, can be {@code null}, in which case the result will be - * {@code null} - * @return base-64/url-encoded serialized {@link PageToken} for the next page. - */ - static @Nullable String encodePageToken(PageToken currentPageToken, @Nullable Token nextToken) { - if (nextToken == null) { - return null; - } - - return serializePageToken( - ImmutablePageToken.builder() - .pageSize(currentPageToken.pageSize()) - .value(nextToken) - .build()); - } - - /** - * Serializes the given {@link PageToken pageToken} - * - * @return base-64/url-encoded serialized {@link PageToken} for the next page. - */ - @VisibleForTesting - static @Nullable String serializePageToken(PageToken pageToken) { - if (pageToken == null) { - return null; - } - - try { - var serialized = SMILE_MAPPER.writeValueAsBytes(pageToken); - return Base64.getUrlEncoder().encodeToString(serialized); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - /** Lazily initialized registry of all token-types. */ - private static final class Registry { - private static final Map<String, Token.TokenType> BY_ID; - - static { - var byId = new HashMap<String, Token.TokenType>(); - var loader = ServiceLoader.load(Token.TokenType.class); - loader.stream() - .map(ServiceLoader.Provider::get) - .forEach( - tokenType -> { - var ex = byId.put(tokenType.id(), tokenType); - if (ex != null) { - throw new IllegalStateException( - format("Duplicate token type ID: from %s and %s", tokenType, ex)); - } - }); - BY_ID = unmodifiableMap(byId); - } - } - - /** - * Jackson type-id resolver, resolves a {@link Token#getT() token type value} to a concrete Java - * type, consulting the {@link Registry}. - */ - static final class TokenTypeIdResolver extends TypeIdResolverBase { - private JavaType baseType; - - public TokenTypeIdResolver() {} - - @Override - public void init(JavaType bt) { - baseType = bt; - } - - @Override - public String idFromValue(Object value) { - return getId(value); - } - - @Override - public String idFromValueAndType(Object value, Class<?> suggestedType) { - return getId(value); - } - - @Override - public JsonTypeInfo.Id getMechanism() { - return JsonTypeInfo.Id.CUSTOM; - } - - private String getId(Object value) { - if (value instanceof Token) { - return ((Token) value).getT(); - } - - return null; - } - - @Override - public JavaType typeFromId(DatabindContext context, String id) { - var idLower = id.toLowerCase(Locale.ROOT); - var asType = Registry.BY_ID.get(idLower); - if (asType == null) { - throw new IllegalStateException("Cannot deserialize paging token value of type " + idLower); - } - if (baseType.getRawClass().isAssignableFrom(asType.javaType())) { - return context.constructSpecializedType(baseType, asType.javaType()); - } - - // This is rather a "test-only" code path, but it might happen in real life as well, when - // calling the ObjectMapper with a "too specific" type and not just Change.class. - // So we can get here for example, if the baseType (induced by the type passed to - // ObjectMapper), is GenericChange.class, but the type is a "well known" type like - // ChangeRename.class. - @SuppressWarnings("unchecked") - var concrete = (Class<? extends Token>) baseType.getRawClass(); - return context.constructSpecializedType(baseType, concrete); - } - } -} diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java similarity index 54% rename from polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java rename to polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java index 5053ac3ce..c8476c351 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/DummyTestToken.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java @@ -18,36 +18,25 @@ */ package org.apache.polaris.core.persistence.pagination; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import java.util.Optional; -import java.util.OptionalInt; -import org.apache.polaris.immutables.PolarisImmutable; +import java.util.List; -@PolarisImmutable -@JsonSerialize(as = ImmutableDummyTestToken.class) -@JsonDeserialize(as = ImmutableDummyTestToken.class) -public interface DummyTestToken extends Token { - String ID = "test-dummy"; +/** + * A {@link PageToken} implementation for readers who want to read everything. The behavior when + * using this token should be the same as when reading without a token. + */ +public class ReadEverythingPageToken extends PageToken { - Optional<String> s(); + public static String PREFIX = "read-everything"; - OptionalInt i(); + public ReadEverythingPageToken() {} @Override - default String getT() { - return ID; + public String toTokenString() { + return PREFIX; } - final class DummyTestTokenType implements TokenType { - @Override - public String id() { - return ID; - } - - @Override - public Class<? extends Token> javaType() { - return DummyTestToken.class; - } + @Override + protected PageToken updated(List<?> newData) { + return new DonePageToken(); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java deleted file mode 100644 index 34adb9560..000000000 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Token.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.core.persistence.pagination; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; -import org.immutables.value.Value; - -/** - * Token base interface. - * - * <p>Concrete token implementations extend this {@link Token} interface and provide a Java services - * registered class that implements {@link Token.TokenType}. - * - * <p>Serialization property names should be intentionally short to reduce the size of the - * serialized paging token. - * - * <p>Example: - * - * {@snippet : - * @PolarisImmutable - * @JsonSerialize(as = ImmutableExampleToken.class) - * @JsonDeserialize(as = ImmutableExampleToken.class) - * public interface ExampleToken extends Token { - * String ID = "example"; - * - * @Override - * default String getT() { - * return ID; - * } - * - * @JsonProperty("a") - * long a(); - * - * @JsonProperty("b") - * String b(); - * - * static ExampleToken newExampleToken(long a, String b) { - * return ImmutableExampleToken.builder().a(a).b(b).build(); - * } - * - * final class ExampleTokenType implements TokenType { - * @Override - * public String id() { - * return ID; - * } - * - * @Override - * public Class<? extends Token> javaType() { - * return ExampleToken.class; - * } - * } - * } - * } - * - * plus a resource file {@code - * META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType} containing - * {@code org.apache.polaris.examples.pagetoken.ExampleToken$ExampleTokenType}. - */ -@JsonTypeIdResolver(PageTokenUtil.TokenTypeIdResolver.class) -@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, property = "t", visible = true) -public interface Token { - - @Value.Redacted - @JsonIgnore - // must use 'getT' here, otherwise the property won't be properly "wired" to be the type info and - // Jackson (deserialization) fails with - // 'com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "t"', if - // this property is just named 'String t()' - String getT(); - - /** Token type specification, referenced via Java's service loader mechanism. */ - interface TokenType { - /** - * ID of the token type, must be equal to the result of {@link Token#getT()} of the concrete - * {@link #javaType() token type}. - */ - String id(); - - /** Concrete token type. */ - Class<? extends Token> javaType(); - } -} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 8c8e26eb8..e79dafcf5 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -31,7 +31,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.AsyncTaskType; @@ -687,8 +686,8 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { } /** - * See {@link PolarisMetaStoreManager#listEntities(PolarisCallContext, List, PolarisEntityType, - * PolarisEntitySubType, PageToken)} + * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, + * PageToken)} */ private @Nonnull ListEntitiesResult listEntities( @Nonnull PolarisCallContext callCtx, @@ -702,24 +701,23 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { // return if we failed to resolve if (resolver.isFailure()) { - return new ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null); - } - - Predicate<PolarisBaseEntity> filter = entity -> true; - // prune the returned list with only entities matching the entity subtype - if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - filter = e -> e.getSubTypeCode() == entitySubType.getCode(); + return new ListEntitiesResult( + BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null, Optional.empty()); } // return list of active entities Page<EntityNameLookupRecord> resultPage = ms.listEntitiesInCurrentTxn( - callCtx, - resolver.getCatalogIdOrNull(), - resolver.getParentId(), - entityType, - filter, - pageToken); + callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); + + // prune the returned list with only entities matching the entity subtype + if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { + resultPage = + pageToken.buildNextPage( + resultPage.items.stream() + .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) + .collect(Collectors.toList())); + } // done return ListEntitiesResult.fromPage(resultPage); @@ -1078,7 +1076,7 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { } createdEntities.add(entityCreateResult.getEntity()); } - return new EntitiesResult(Page.fromItems(createdEntities)); + return new EntitiesResult(createdEntities); }); } @@ -1182,7 +1180,7 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { } // good, all success - return new EntitiesResult(Page.fromItems(updatedEntities)); + return new EntitiesResult(updatedEntities); } /** {@inheritDoc} */ @@ -1387,7 +1385,7 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { entity -> true, Function.identity(), PageToken.fromLimit(2)) - .items(); + .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1973,42 +1971,36 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { Function.identity(), pageToken); - List<PolarisBaseEntity> loadedTasks = - availableTasks.items().stream() - .map( - task -> { - PolarisBaseEntity.Builder updatedTask = new PolarisBaseEntity.Builder(task); - Map<String, String> properties = - PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); - properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); - properties.put( - PolarisTaskConstants.LAST_ATTEMPT_START_TIME, - String.valueOf(callCtx.getClock().millis())); - properties.put( - PolarisTaskConstants.ATTEMPT_COUNT, - String.valueOf( - Integer.parseInt( - properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) - + 1)); - updatedTask.properties( - PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); - EntityResult result = - updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask.build()); - if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { - return result.getEntity(); - } else { - // TODO: Consider performing incremental leasing of individual tasks one at a - // time - // instead of requiring all-or-none semantics for all the tasks we think we - // listed, - // or else contention could be very bad. - ms.rollback(); - throw new RetryOnConcurrencyException( - "Failed to lease available task with status %s, info: %s", - result.getReturnStatus(), result.getExtraInformation()); - } - }) - .collect(Collectors.toList()); + List<PolarisBaseEntity> loadedTasks = new ArrayList<>(); + availableTasks.items.forEach( + task -> { + PolarisBaseEntity.Builder updatedTask = new PolarisBaseEntity.Builder(task); + Map<String, String> properties = + PolarisObjectMapperUtil.deserializeProperties(callCtx, task.getProperties()); + properties.put(PolarisTaskConstants.LAST_ATTEMPT_EXECUTOR_ID, executorId); + properties.put( + PolarisTaskConstants.LAST_ATTEMPT_START_TIME, + String.valueOf(callCtx.getClock().millis())); + properties.put( + PolarisTaskConstants.ATTEMPT_COUNT, + String.valueOf( + Integer.parseInt(properties.getOrDefault(PolarisTaskConstants.ATTEMPT_COUNT, "0")) + + 1)); + updatedTask.properties(PolarisObjectMapperUtil.serializeProperties(callCtx, properties)); + EntityResult result = + updateEntityPropertiesIfNotChanged(callCtx, ms, null, updatedTask.build()); + if (result.getReturnStatus() == BaseResult.ReturnStatus.SUCCESS) { + loadedTasks.add(result.getEntity()); + } else { + // TODO: Consider performing incremental leasing of individual tasks one at a time + // instead of requiring all-or-none semantics for all the tasks we think we listed, + // or else contention could be very bad. + ms.rollback(); + throw new RetryOnConcurrencyException( + "Failed to lease available task with status %s, info: %s", + result.getReturnStatus(), result.getExtraInformation()); + } + }); return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 3bc7fd976..12907b08d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -21,7 +21,6 @@ package org.apache.polaris.core.persistence.transactional; import com.google.common.base.Predicates; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; -import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -45,7 +44,7 @@ import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; -import org.apache.polaris.core.persistence.pagination.EntityIdToken; +import org.apache.polaris.core.persistence.pagination.HasPageSize; import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; @@ -368,23 +367,13 @@ public class TreeMapTransactionalPersistenceImpl extends AbstractTransactionalPe .map( nameRecord -> this.lookupEntityInCurrentTxn( - callCtx, catalogId, nameRecord.getId(), entityType.getCode())); - - Predicate<PolarisBaseEntity> tokenFilter = - pageToken - .valueAs(EntityIdToken.class) - .map( - entityIdToken -> { - var nextId = entityIdToken.entityId(); - return (Predicate<PolarisBaseEntity>) e -> e.getId() > nextId; - }) - .orElse(e -> true); - - data = data.sorted(Comparator.comparingLong(PolarisEntityCore::getId)).filter(tokenFilter); - - data = data.filter(entityFilter); + callCtx, catalogId, nameRecord.getId(), entityType.getCode())) + .filter(entityFilter); + if (pageToken instanceof HasPageSize) { + data = data.limit(((HasPageSize) pageToken).getPageSize()); + } - return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity); + return Page.fromItems(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType b/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType deleted file mode 100644 index 3579dd29b..000000000 --- a/polaris-core/src/main/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -org.apache.polaris.core.persistence.pagination.EntityIdToken$EntityIdTokenType diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java deleted file mode 100644 index 338bbc53f..000000000 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/pagination/PageTokenTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.core.persistence.pagination; - -import static org.junit.jupiter.params.provider.Arguments.arguments; - -import java.util.OptionalInt; -import java.util.function.Function; -import java.util.stream.Stream; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -@ExtendWith(SoftAssertionsExtension.class) -class PageTokenTest { - @InjectSoftAssertions SoftAssertions soft; - - @Test - public void testReadEverything() { - PageToken r = PageToken.readEverything(); - soft.assertThat(r.paginationRequested()).isFalse(); - soft.assertThat(r.pageSize()).isEmpty(); - soft.assertThat(r.value()).isEmpty(); - - Page<Integer> pageEverything = - Page.mapped( - r, - Stream.of(1, 2, 3, 4), - Function.identity(), - i -> i != null ? EntityIdToken.fromEntityId(i) : null); - soft.assertThat(pageEverything.encodedResponseToken()).isNull(); - soft.assertThat(pageEverything.items()).containsExactly(1, 2, 3, 4); - - r = PageToken.build(null, null); - soft.assertThat(r.paginationRequested()).isFalse(); - soft.assertThat(r.pageSize()).isEmpty(); - soft.assertThat(r.value()).isEmpty(); - } - - @Test - public void testLimit() { - PageToken r = PageToken.fromLimit(123); - soft.assertThat(r).isEqualTo(PageToken.build(null, 123)); - soft.assertThat(r.paginationRequested()).isTrue(); - soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123)); - soft.assertThat(r.value()).isEmpty(); - } - - @Test - public void testTokenValueForPaging() { - PageToken r = PageToken.fromLimit(2); - soft.assertThat(r).isEqualTo(PageToken.build(null, 2)); - Page<Integer> pageMoreData = - Page.mapped( - r, - Stream.of(1, 2, 3, 4), - Function.identity(), - i -> i != null ? EntityIdToken.fromEntityId(i) : null); - soft.assertThat(pageMoreData.encodedResponseToken()).isNotBlank(); - soft.assertThat(pageMoreData.items()).containsExactly(1, 2); - - // last page (no more data) - number of items is equal to the requested page size - Page<Integer> lastPageSaturated = - Page.mapped( - r, - Stream.of(3, 4), - Function.identity(), - i -> i != null ? EntityIdToken.fromEntityId(i) : null); - // last page (no more data) - next-token must be null - soft.assertThat(lastPageSaturated.encodedResponseToken()).isNull(); - soft.assertThat(lastPageSaturated.items()).containsExactly(3, 4); - - // last page (no more data) - number of items is less than the requested page size - Page<Integer> lastPageNotSaturated = - Page.mapped( - r, - Stream.of(3), - Function.identity(), - i -> i != null ? EntityIdToken.fromEntityId(i) : null); - soft.assertThat(lastPageNotSaturated.encodedResponseToken()).isNull(); - soft.assertThat(lastPageNotSaturated.items()).containsExactly(3); - - r = PageToken.fromLimit(200); - soft.assertThat(r).isEqualTo(PageToken.build(null, 200)); - Page<Integer> page200 = - Page.mapped( - r, - Stream.of(1, 2, 3, 4), - Function.identity(), - i -> i != null ? EntityIdToken.fromEntityId(i) : null); - soft.assertThat(page200.encodedResponseToken()).isNull(); - soft.assertThat(page200.items()).containsExactly(1, 2, 3, 4); - } - - @ParameterizedTest - @MethodSource - public void testDeSer(Integer pageSize, String serializedPageToken, PageToken expectedPageToken) { - soft.assertThat(PageTokenUtil.decodePageRequest(serializedPageToken, pageSize)) - .isEqualTo(expectedPageToken); - } - - static Stream<Arguments> testDeSer() { - var entity42page123 = - ImmutablePageToken.builder().pageSize(123).value(EntityIdToken.fromEntityId(42)).build(); - var entity42page123ser = PageTokenUtil.serializePageToken(entity42page123); - return Stream.of( - arguments(null, null, PageToken.readEverything()), - arguments(123, null, PageToken.fromLimit(123)), - arguments(123, entity42page123ser, entity42page123), - arguments( - 123, - PageTokenUtil.serializePageToken( - ImmutablePageToken.builder() - .pageSize(999999) - .value(EntityIdToken.fromEntityId(42)) - .build()), - entity42page123)); - } - - @ParameterizedTest - @MethodSource - public void testApiRoundTrip(Token token) { - PageToken request = PageToken.build(null, 123); - Page<?> page = Page.mapped(request, Stream.of("i1"), Function.identity(), x -> token); - soft.assertThat(page.encodedResponseToken()).isNotBlank(); - - PageToken r = PageToken.build(page.encodedResponseToken(), null); - soft.assertThat(r.value()).contains(token); - soft.assertThat(r.paginationRequested()).isTrue(); - soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(123)); - - r = PageToken.build(page.encodedResponseToken(), 456); - soft.assertThat(r.value()).contains(token); - soft.assertThat(r.paginationRequested()).isTrue(); - soft.assertThat(r.pageSize()).isEqualTo(OptionalInt.of(456)); - } - - static Stream<Token> testApiRoundTrip() { - return Stream.of( - EntityIdToken.fromEntityId(123), - EntityIdToken.fromEntityId(456), - ImmutableDummyTestToken.builder().s("str").i(42).build(), - ImmutableDummyTestToken.builder().i(42).build(), - ImmutableDummyTestToken.builder().build()); - } -} diff --git a/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType b/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType deleted file mode 100644 index 26778107e..000000000 --- a/polaris-core/src/test/resources/META-INF/services/org.apache.polaris.core.persistence.pagination.Token$TokenType +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -org.apache.polaris.core.persistence.pagination.DummyTestToken$DummyTestTokenType diff --git a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 905934e67..30f5939ed 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -116,7 +116,6 @@ import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.cache.InMemoryEntityCache; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; -import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; @@ -212,8 +211,6 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { "test", "polaris.readiness.ignore-severe-issues", "true", - "LIST_PAGINATION_ENABLED", - "true", "polaris.features.\"ALLOW_TABLE_LOCATION_OVERLAP\"", "true"); } @@ -2329,131 +2326,4 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); } - - private static PageToken nextRequest(Page<?> previousPage) { - return PageToken.build(previousPage.encodedResponseToken(), null); - } - - @Test - public void testPaginatedListTables() { - Assumptions.assumeTrue( - requiresNamespaceCreate(), - "Only applicable if namespaces must be created before adding children"); - - catalog.createNamespace(NS); - - for (int i = 0; i < 5; i++) { - catalog.buildTable(TableIdentifier.of(NS, "pagination_table_" + i), SCHEMA).create(); - } - - try { - // List without pagination - Assertions.assertThat(catalog.listTables(NS)).isNotNull().hasSize(5); - - // List with a limit: - Page<?> firstListResult = catalog.listTables(NS, PageToken.fromLimit(2)); - Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); - Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the previously obtained token: - Page<?> secondListResult = catalog.listTables(NS, nextRequest(firstListResult)); - Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); - Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the final token: - Page<?> finalListResult = catalog.listTables(NS, nextRequest(secondListResult)); - Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); - Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); - } finally { - for (int i = 0; i < 5; i++) { - catalog.dropTable(TableIdentifier.of(NS, "pagination_table_" + i)); - } - } - } - - @Test - public void testPaginatedListViews() { - Assumptions.assumeTrue( - requiresNamespaceCreate(), - "Only applicable if namespaces must be created before adding children"); - - catalog.createNamespace(NS); - - for (int i = 0; i < 5; i++) { - catalog - .buildView(TableIdentifier.of(NS, "pagination_view_" + i)) - .withQuery("a_" + i, "SELECT 1 id") - .withSchema(SCHEMA) - .withDefaultNamespace(NS) - .create(); - } - - try { - // List without pagination - Assertions.assertThat(catalog.listViews(NS)).isNotNull().hasSize(5); - - // List with a limit: - Page<?> firstListResult = catalog.listViews(NS, PageToken.fromLimit(2)); - Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); - Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the previously obtained token: - Page<?> secondListResult = catalog.listViews(NS, nextRequest(firstListResult)); - Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); - Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the final token: - Page<?> finalListResult = catalog.listViews(NS, nextRequest(secondListResult)); - Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); - Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); - } finally { - for (int i = 0; i < 5; i++) { - catalog.dropTable(TableIdentifier.of(NS, "pagination_view_" + i)); - } - } - } - - @Test - public void testPaginatedListNamespaces() { - for (int i = 0; i < 5; i++) { - catalog.createNamespace(Namespace.of("pagination_namespace_" + i)); - } - - try { - // List without pagination - Assertions.assertThat(catalog.listNamespaces()).isNotNull().hasSize(5); - - // List with a limit: - Page<?> firstListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(2)); - Assertions.assertThat(firstListResult.items().size()).isEqualTo(2); - Assertions.assertThat(firstListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the previously obtained token: - Page<?> secondListResult = - catalog.listNamespaces(Namespace.empty(), nextRequest(firstListResult)); - Assertions.assertThat(secondListResult.items().size()).isEqualTo(2); - Assertions.assertThat(secondListResult.encodedResponseToken()).isNotNull().isNotEmpty(); - - // List using the final token: - Page<?> finalListResult = - catalog.listNamespaces(Namespace.empty(), nextRequest(secondListResult)); - Assertions.assertThat(finalListResult.items().size()).isEqualTo(1); - Assertions.assertThat(finalListResult.encodedResponseToken()).isNull(); - - // List with page size matching the amount of data, no more pages - Page<?> firstExactListResult = - catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(5)); - Assertions.assertThat(firstExactListResult.items().size()).isEqualTo(5); - Assertions.assertThat(firstExactListResult.encodedResponseToken()).isNull(); - - // List with huge page size: - Page<?> bigListResult = catalog.listNamespaces(Namespace.empty(), PageToken.fromLimit(9999)); - Assertions.assertThat(bigListResult.items().size()).isEqualTo(5); - Assertions.assertThat(bigListResult.encodedResponseToken()).isNull(); - } finally { - for (int i = 0; i < 5; i++) { - catalog.dropNamespace(Namespace.of("pagination_namespace_" + i)); - } - } - } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 031c3882f..1a8e76900 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -458,10 +458,14 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<TableIdentifier> listTables(Namespace namespace) { - return listTables(namespace, PageToken.readEverything()).items(); + return listTables(namespace, PageToken.readEverything()).items; } - public Page<TableIdentifier> listTables(Namespace namespace, PageToken pageToken) { + public Page<TableIdentifier> listTables(Namespace namespace, String pageToken, Integer pageSize) { + return listTables(namespace, buildPageToken(pageToken, pageSize)); + } + + private Page<TableIdentifier> listTables(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: '%s'", namespace); @@ -774,10 +778,14 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - return listNamespaces(namespace, PageToken.readEverything()).items(); + return listNamespaces(namespace, PageToken.readEverything()).items; + } + + public Page<Namespace> listNamespaces(Namespace namespace, String pageToken, Integer pageSize) { + return listNamespaces(namespace, buildPageToken(pageToken, pageSize)); } - public Page<Namespace> listNamespaces(Namespace namespace, PageToken pageToken) + private Page<Namespace> listNamespaces(Namespace namespace, PageToken pageToken) throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { @@ -793,12 +801,13 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog PolarisEntityType.NAMESPACE, PolarisEntitySubType.NULL_SUBTYPE, pageToken); + List<PolarisEntity.NameAndId> entities = + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List<Namespace> namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); return listResult - .getPage() - .map( - record -> - PolarisCatalogHelpers.nameAndIdToNamespace( - catalogPath, new PolarisEntity.NameAndId(record.getName(), record.getId()))); + .getPageToken() + .map(token -> new Page<>(token, namespaces)) + .orElseGet(() -> Page.fromItems(namespaces)); } @Override @@ -810,10 +819,14 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<TableIdentifier> listViews(Namespace namespace) { - return listViews(namespace, PageToken.readEverything()).items(); + return listViews(namespace, PageToken.readEverything()).items; } - public Page<TableIdentifier> listViews(Namespace namespace, PageToken pageToken) { + public Page<TableIdentifier> listViews(Namespace namespace, String pageToken, Integer pageSize) { + return listViews(namespace, buildPageToken(pageToken, pageSize)); + } + + private Page<TableIdentifier> listViews(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: '%s'", namespace); @@ -2583,11 +2596,15 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog PolarisEntityType.TABLE_LIKE, subType, pageToken); + List<PolarisEntity.NameAndId> entities = + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List<TableIdentifier> identifiers = + PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); - Namespace parentNamespace = PolarisCatalogHelpers.parentNamespace(catalogPath); return listResult - .getPage() - .map(record -> TableIdentifier.of(parentNamespace, record.getName())); + .getPageToken() + .map(token -> new Page<>(token, identifiers)) + .orElseGet(() -> Page.fromItems(identifiers)); } /** @@ -2625,4 +2642,18 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog .getRealmConfig() .getConfig(FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES); } + + /** Build a {@link PageToken} from a string and page size. */ + private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer pageSize) { + + boolean paginationEnabled = + callContext + .getRealmConfig() + .getConfig(FeatureConfiguration.LIST_PAGINATION_ENABLED, catalogEntity); + if (!paginationEnabled) { + return PageToken.readEverything(); + } else { + return PageToken.build(tokenString, pageSize); + } + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index d8ceea08d..c0b0b3259 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -91,7 +91,6 @@ import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager; import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; import org.apache.polaris.core.persistence.pagination.Page; -import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -184,11 +183,10 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicNamespaceOperationOrThrow(op, parent); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - PageToken pageRequest = PageToken.build(pageToken, pageSize); - Page<Namespace> results = polarisCatalog.listNamespaces(parent, pageRequest); + Page<Namespace> results = polarisCatalog.listNamespaces(parent, pageToken, pageSize); return ListNamespacesResponse.builder() - .addAll(results.items()) - .nextPageToken(results.encodedResponseToken()) + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) .build(); } else { return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, pageToken, pageSize); @@ -345,11 +343,10 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - PageToken pageRequest = PageToken.build(pageToken, pageSize); - Page<TableIdentifier> results = polarisCatalog.listTables(namespace, pageRequest); + Page<TableIdentifier> results = polarisCatalog.listTables(namespace, pageToken, pageSize); return ListTablesResponse.builder() - .addAll(results.items()) - .nextPageToken(results.encodedResponseToken()) + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) .build(); } else { return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, pageSize); @@ -1008,11 +1005,10 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab authorizeBasicNamespaceOperationOrThrow(op, namespace); if (baseCatalog instanceof IcebergCatalog polarisCatalog) { - PageToken pageRequest = PageToken.build(pageToken, pageSize); - Page<TableIdentifier> results = polarisCatalog.listViews(namespace, pageRequest); + Page<TableIdentifier> results = polarisCatalog.listViews(namespace, pageToken, pageSize); return ListTablesResponse.builder() - .addAll(results.items()) - .nextPageToken(results.encodedResponseToken()) + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, pageSize); diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java new file mode 100644 index 000000000..97e52fb84 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.persistence.pagination; + +import org.apache.polaris.core.persistence.pagination.DonePageToken; +import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PageTokenTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PageTokenTest.class); + + @Test + void testDoneToken() { + Assertions.assertThat(new DonePageToken()).doesNotReturn(null, PageToken::toString); + Assertions.assertThat(new DonePageToken()).returns(null, PageToken::toTokenString); + Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken()); + Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new DonePageToken().hashCode()); + } + + @Test + void testReadEverythingPageToken() { + PageToken token = PageToken.readEverything(); + + Assertions.assertThat(token.toString()).isNotNull(); + Assertions.assertThat(token.toTokenString()).isNotNull(); + Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class); + + Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything()); + } +}