This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 44064cbf4 Interface changes for pagination (#1528) 44064cbf4 is described below commit 44064cbf4e4ab7aa29eeafdfae8f41510add8784 Author: Eric Maynard <eric.maynard+...@snowflake.com> AuthorDate: Fri May 9 10:39:02 2025 -0700 Interface changes for pagination (#1528) * add missing apis * more tests, fixes * clean up drop * autolint * changes per review * revert iceberg messages to comply with oss tests * another revert * more iceberg catalog changes * autolint * dependency issues * more wiring * continuing rebase * remaining issues are related to task loading * re-add tests * debugging * fix failing tests * fix another test * changes per review * autolint * some fixes * stable * updates for new persistence * fix * continuing work * more reverts * continue reverts * more reverts * yank tests * autolint * test reverts * try to support limit without real page tokens * autolint * Stable * change comment * autolint * remove catalog config for now * changes per review * more tweaks * simplify types per review * Stable, about to refactor more * re-stable * polish * autolint * more changes per review * stable --- .../PolarisEclipseLinkMetaStoreSessionImpl.java | 47 +++++---- .../impl/eclipselink/PolarisEclipseLinkStore.java | 7 +- .../relational/jdbc/JdbcBasePersistenceImpl.java | 45 +++++---- .../polaris/core/config/FeatureConfiguration.java | 7 ++ .../AtomicOperationMetaStoreManager.java | 47 +++++---- .../polaris/core/persistence/BasePersistence.java | 20 ++-- .../core/persistence/PolarisMetaStoreManager.java | 9 +- .../TransactionWorkspaceMetaStoreManager.java | 6 +- .../persistence/dao/entity/EntitiesResult.java | 25 ++++- .../persistence/dao/entity/ListEntitiesResult.java | 27 +++++- .../core/persistence/pagination/DonePageToken.java | 40 ++++++++ .../core/persistence/pagination/HasPageSize.java | 27 ++++++ .../persistence/pagination/LimitPageToken.java | 52 ++++++++++ .../polaris/core/persistence/pagination/Page.java | 42 ++++++++ .../core/persistence/pagination/PageToken.java | 99 +++++++++++++++++++ .../pagination/ReadEverythingPageToken.java | 42 ++++++++ .../AbstractTransactionalPersistence.java | 26 +++-- .../TransactionalMetaStoreManagerImpl.java | 66 +++++++------ .../transactional/TransactionalPersistence.java | 18 ++-- .../TreeMapTransactionalPersistenceImpl.java | 54 ++++++----- .../BasePolarisMetaStoreManagerTest.java | 21 ++-- .../persistence/PolarisTestMetaStoreManager.java | 26 +++-- .../quarkus/catalog/IcebergCatalogTest.java | 10 +- .../quarkus/task/TableCleanupTaskHandlerTest.java | 13 +-- .../polaris/service/admin/PolarisAdminService.java | 13 ++- .../catalog/generic/GenericTableCatalog.java | 4 +- .../service/catalog/iceberg/IcebergCatalog.java | 106 ++++++++++++++++----- .../catalog/iceberg/IcebergCatalogAdapter.java | 13 ++- .../catalog/iceberg/IcebergCatalogHandler.java | 53 +++++++++++ .../service/catalog/policy/PolicyCatalog.java | 4 +- .../service/catalog/io/FileIOFactoryTest.java | 3 +- .../persistence/pagination/PageTokenTest.java | 50 ++++++++++ 32 files changed, 823 insertions(+), 199 deletions(-) diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java index e19b0ef72..b6bd23762 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java @@ -37,6 +37,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.EntityNameLookupRecord; @@ -52,6 +53,9 @@ import org.apache.polaris.core.exceptions.AlreadyExistsException; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; @@ -419,29 +423,30 @@ public class PolarisEclipseLinkMetaStoreSessionImpl extends AbstractTransactiona /** {@inheritDoc} */ @Override - public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType) { + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken) { return this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue()); + callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken); } @Override - public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter) { + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken) { // full range scan under the parent for that type return this.listEntitiesInCurrentTxn( callCtx, catalogId, parentId, entityType, - Integer.MAX_VALUE, entityFilter, entity -> new EntityNameLookupRecord( @@ -450,27 +455,33 @@ public class PolarisEclipseLinkMetaStoreSessionImpl extends AbstractTransactiona entity.getParentId(), entity.getName(), entity.getTypeCode(), - entity.getSubTypeCode())); + entity.getSubTypeCode()), + pageToken); } @Override - public @Nonnull <T> List<T> listEntitiesInCurrentTxn( + public @Nonnull <T> Page<T> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer) { + @Nonnull Function<PolarisBaseEntity, T> transformer, + @Nonnull PageToken pageToken) { // full range scan under the parent for that type - return this.store - .lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType) - .stream() - .map(ModelEntity::toEntity) - .filter(entityFilter) - .limit(limit) - .map(transformer) - .collect(Collectors.toList()); + Stream<PolarisBaseEntity> data = + this.store + .lookupFullEntitiesActive( + localSession.get(), catalogId, parentId, entityType, pageToken) + .stream() + .map(ModelEntity::toEntity) + .filter(entityFilter); + + if (pageToken instanceof HasPageSize hasPageSize) { + data = data.limit(hasPageSize.getPageSize()); + } + + return Page.fromItems(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ diff --git a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java index 0988dcb7f..4e992e07f 100644 --- a/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java +++ b/extension/persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java @@ -35,6 +35,7 @@ import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.jpa.models.ModelEntity; import org.apache.polaris.jpa.models.ModelEntityActive; @@ -282,7 +283,11 @@ public class PolarisEclipseLinkStore { } List<ModelEntity> lookupFullEntitiesActive( - EntityManager session, long catalogId, long parentId, @Nonnull PolarisEntityType entityType) { + EntityManager session, + long catalogId, + long parentId, + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken) { diagnosticServices.check(session != null, "session_is_null"); checkInitialized(); diff --git a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java index 38448934f..cd6a0b6c0 100644 --- a/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java +++ b/extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcBasePersistenceImpl.java @@ -49,6 +49,9 @@ import org.apache.polaris.core.persistence.IntegrationPersistence; import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyType; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; @@ -352,49 +355,51 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers @Nonnull @Override - public List<EntityNameLookupRecord> listEntities( + public Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType) { + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken) { return listEntities( callCtx, catalogId, parentId, entityType, - Integer.MAX_VALUE, entity -> true, - EntityNameLookupRecord::new); + EntityNameLookupRecord::new, + pageToken); } @Nonnull @Override - public List<EntityNameLookupRecord> listEntities( + public Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter) { + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken) { return listEntities( callCtx, catalogId, parentId, entityType, - Integer.MAX_VALUE, entityFilter, - EntityNameLookupRecord::new); + EntityNameLookupRecord::new, + pageToken); } @Nonnull @Override - public <T> List<T> listEntities( + public <T> Page<T> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer) { + @Nonnull Function<PolarisBaseEntity, T> transformer, + @Nonnull PageToken pageToken) { Map<String, Object> params = Map.of( "catalog_id", @@ -415,15 +420,17 @@ public class JdbcBasePersistenceImpl implements BasePersistence, IntegrationPers query, new ModelEntity(), stream -> { - stream - .map(ModelEntity::toEntity) - .filter(entityFilter) - .limit(limit) - .forEach(results::add); + var data = stream.map(ModelEntity::toEntity).filter(entityFilter); + if (pageToken instanceof HasPageSize hasPageSize) { + data = data.limit(hasPageSize.getPageSize()); + } + data.forEach(results::add); }); - return results == null - ? Collections.emptyList() - : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); + List<T> resultsOrEmpty = + results == null + ? Collections.emptyList() + : results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList()); + return Page.fromItems(resultsOrEmpty); } catch (SQLException e) { throw new RuntimeException( String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e); diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index ee663dbb1..26437a8f6 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -202,6 +202,13 @@ public class FeatureConfiguration<T> extends PolarisConfiguration<T> { .defaultValue(2) .buildFeatureConfiguration(); + public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED = + PolarisConfiguration.<Boolean>builder() + .key("LIST_PAGINATION_ENABLED") + .description("If set to true, pagination for APIs like listTables is enabled.") + .defaultValue(false) + .buildFeatureConfiguration(); + public static final FeatureConfiguration<Boolean> ENABLE_GENERIC_TABLES = PolarisConfiguration.<Boolean>builder() .key("ENABLE_GENERIC_TABLES") diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index b2eb8fcfa..2a32fb6f9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -62,6 +62,8 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; @@ -687,7 +689,8 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { @Nonnull PolarisCallContext callCtx, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, - @Nonnull PolarisEntitySubType entitySubType) { + @Nonnull PolarisEntitySubType entitySubType, + @Nonnull PageToken pageToken) { // get meta store we should be using BasePersistence ms = callCtx.getMetaStore(); @@ -699,15 +702,16 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { catalogPath == null || catalogPath.size() == 0 ? 0l : catalogPath.get(catalogPath.size() - 1).getId(); - List<EntityNameLookupRecord> toreturnList = - ms.listEntities(callCtx, catalogId, parentId, entityType); + Page<EntityNameLookupRecord> resultPage = + ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken); // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - toreturnList = - toreturnList.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList()); + resultPage = + pageToken.buildNextPage( + resultPage.items.stream() + .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) + .collect(Collectors.toList())); } // TODO: Use post-validation to enforce consistent view against catalogPath. In the @@ -717,7 +721,7 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { // in-flight request (the cache-based resolution follows a different path entirely). // done - return new ListEntitiesResult(toreturnList); + return ListEntitiesResult.fromPage(resultPage); } /** {@inheritDoc} */ @@ -1176,13 +1180,14 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { // get the list of catalog roles, at most 2 List<PolarisBaseEntity> catalogRoles = ms.listEntities( - callCtx, - catalogId, - catalogId, - PolarisEntityType.CATALOG_ROLE, - 2, - entity -> true, - Function.identity()); + callCtx, + catalogId, + catalogId, + PolarisEntityType.CATALOG_ROLE, + entity -> true, + Function.identity(), + PageToken.fromLimit(2)) + .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1488,17 +1493,16 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { @Override public @Nonnull EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, int limit) { + @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { BasePersistence ms = callCtx.getMetaStore(); // find all available tasks - List<PolarisBaseEntity> availableTasks = + Page<PolarisBaseEntity> availableTasks = ms.listEntities( callCtx, PolarisEntityConstants.getRootEntityId(), PolarisEntityConstants.getRootEntityId(), PolarisEntityType.TASK, - limit, entity -> { PolarisObjectMapperUtil.TaskExecutionState taskState = PolarisObjectMapperUtil.parseTaskState(entity); @@ -1513,11 +1517,12 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { || taskState.executor == null || callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout; }, - Function.identity()); + Function.identity(), + pageToken); List<PolarisBaseEntity> loadedTasks = new ArrayList<>(); final AtomicInteger failedLeaseCount = new AtomicInteger(0); - availableTasks.forEach( + availableTasks.items.forEach( task -> { PolarisBaseEntity updatedTask = new PolarisBaseEntity(task); Map<String, String> properties = @@ -1554,7 +1559,7 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { throw new RetryOnConcurrencyException( "Failed to lease any of %s tasks due to concurrent leases", failedLeaseCount.get()); } - return new EntitiesResult(loadedTasks); + return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java index 75b18fb45..9d6f6ba35 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java @@ -31,6 +31,8 @@ import org.apache.polaris.core.entity.PolarisEntityCore; import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolicyMappingPersistence; /** @@ -270,14 +272,16 @@ public interface BasePersistence extends PolicyMappingPersistence { * @param catalogId catalog id for that entity, NULL_ID if the entity is top-level * @param parentId id of the parent, can be the special 0 value representing the root entity * @param entityType type of entities to list + * @param pageToken the token to start listing after * @return the list of entities for the specified list operation */ @Nonnull - List<EntityNameLookupRecord> listEntities( + Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType); + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken); /** * List entities where some predicate returns true @@ -288,15 +292,17 @@ public interface BasePersistence extends PolicyMappingPersistence { * @param entityType type of entities to list * @param entityFilter the filter to be applied to each entity. Only entities where the predicate * returns true are returned in the list + * @param pageToken the token to start listing after * @return the list of entities for which the predicate returns true */ @Nonnull - List<EntityNameLookupRecord> listEntities( + Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter); + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken); /** * List entities where some predicate returns true and transform the entities with a function @@ -313,14 +319,14 @@ public interface BasePersistence extends PolicyMappingPersistence { * @return the list of entities for which the predicate returns true */ @Nonnull - <T> List<T> listEntities( + <T> Page<T> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer); + @Nonnull Function<PolarisBaseEntity, T> transformer, + PageToken pageToken); /** * Lookup the current entityGrantRecordsVersion for the specified entity. That version is changed diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index da2ab521e..2a20ad5c1 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -42,6 +42,7 @@ import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult; import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingManager; import org.apache.polaris.core.storage.PolarisCredentialVendor; @@ -120,7 +121,8 @@ public interface PolarisMetaStoreManager @Nonnull PolarisCallContext callCtx, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, - @Nonnull PolarisEntitySubType entitySubType); + @Nonnull PolarisEntitySubType entitySubType, + @Nonnull PageToken pageToken); /** * Generate a new unique id that can be used by the Polaris client when it needs to create a new @@ -300,11 +302,12 @@ public interface PolarisMetaStoreManager * * @param callCtx call context * @param executorId executor id - * @param limit limit + * @param pageToken page token to start after * @return list of tasks to be completed */ @Nonnull - EntitiesResult loadTasks(@Nonnull PolarisCallContext callCtx, String executorId, int limit); + EntitiesResult loadTasks( + @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken); /** * Load change tracking information for a set of entities in one single shot and return for each diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 9b5a6b6db..b7ba47e83 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -50,6 +50,7 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -118,7 +119,8 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan @Nonnull PolarisCallContext callCtx, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, - @Nonnull PolarisEntitySubType entitySubType) { + @Nonnull PolarisEntitySubType entitySubType, + @Nonnull PageToken pageToken) { callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "listEntities"); return null; } @@ -320,7 +322,7 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan @Override public EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, int limit) { + @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { callCtx.getDiagServices().fail("illegal_method_in_transaction_workspace", "loadTasks"); return null; } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java index 70d9edcf5..e27b69680 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/EntitiesResult.java @@ -23,13 +23,21 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; +import java.util.Optional; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; /** a set of returned entities result */ public class EntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned private final List<PolarisBaseEntity> entities; + private final Optional<PageToken> pageTokenOpt; + + public static EntitiesResult fromPage(Page<PolarisBaseEntity> page) { + return new EntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + } /** * Constructor for an error @@ -40,6 +48,11 @@ public class EntitiesResult extends BaseResult { public EntitiesResult(@Nonnull ReturnStatus errorStatus, @Nullable String extraInformation) { super(errorStatus, extraInformation); this.entities = null; + this.pageTokenOpt = Optional.empty(); + } + + public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) { + this(entities, Optional.empty()); } /** @@ -47,21 +60,29 @@ public class EntitiesResult extends BaseResult { * * @param entities list of entities being returned, implies success */ - public EntitiesResult(@Nonnull List<PolarisBaseEntity> entities) { + public EntitiesResult( + @Nonnull List<PolarisBaseEntity> entities, @Nonnull Optional<PageToken> pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } @JsonCreator private EntitiesResult( @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List<PolarisBaseEntity> entities) { + @JsonProperty("entities") List<PolarisBaseEntity> entities, + @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) { super(returnStatus, extraInformation); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } public List<PolarisBaseEntity> getEntities() { return entities; } + + public Optional<PageToken> getPageToken() { + return pageTokenOpt; + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java index bc51f4dab..10669e899 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ListEntitiesResult.java @@ -23,13 +23,22 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.List; +import java.util.Optional; import org.apache.polaris.core.entity.EntityNameLookupRecord; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; /** the return the result for a list entities call */ public class ListEntitiesResult extends BaseResult { // null if not success. Else the list of entities being returned private final List<EntityNameLookupRecord> entities; + private final Optional<PageToken> pageTokenOpt; + + /** Create a {@link ListEntitiesResult} from a {@link Page} */ + public static ListEntitiesResult fromPage(Page<EntityNameLookupRecord> page) { + return new ListEntitiesResult(page.items, Optional.ofNullable(page.pageToken)); + } /** * Constructor for an error @@ -37,9 +46,13 @@ public class ListEntitiesResult extends BaseResult { * @param errorCode error code, cannot be SUCCESS * @param extraInformation extra information */ - public ListEntitiesResult(@Nonnull ReturnStatus errorCode, @Nullable String extraInformation) { + public ListEntitiesResult( + @Nonnull ReturnStatus errorCode, + @Nullable String extraInformation, + @Nonnull Optional<PageToken> pageTokenOpt) { super(errorCode, extraInformation); this.entities = null; + this.pageTokenOpt = pageTokenOpt; } /** @@ -47,21 +60,29 @@ public class ListEntitiesResult extends BaseResult { * * @param entities list of entities being returned, implies success */ - public ListEntitiesResult(@Nonnull List<EntityNameLookupRecord> entities) { + public ListEntitiesResult( + @Nonnull List<EntityNameLookupRecord> entities, @Nonnull Optional<PageToken> pageTokenOpt) { super(ReturnStatus.SUCCESS); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } @JsonCreator private ListEntitiesResult( @JsonProperty("returnStatus") @Nonnull ReturnStatus returnStatus, @JsonProperty("extraInformation") String extraInformation, - @JsonProperty("entities") List<EntityNameLookupRecord> entities) { + @JsonProperty("entities") List<EntityNameLookupRecord> entities, + @JsonProperty("pageToken") Optional<PageToken> pageTokenOpt) { super(returnStatus, extraInformation); this.entities = entities; + this.pageTokenOpt = pageTokenOpt; } public List<EntityNameLookupRecord> getEntities() { return entities; } + + public Optional<PageToken> getPageToken() { + return pageTokenOpt; + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java new file mode 100644 index 000000000..d46ea7b02 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/DonePageToken.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; + +/** + * A {@link PageToken} string that represents the lack of a page token. Returns `null` in + * `toTokenString`, which the client will interpret as there being no more data available. + */ +public class DonePageToken extends PageToken { + + public DonePageToken() {} + + @Override + public String toTokenString() { + return null; + } + + @Override + protected PageToken updated(List<?> newData) { + throw new IllegalStateException("DonePageToken.updated is invalid"); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java new file mode 100644 index 000000000..c6b216fcd --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/HasPageSize.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +/** + * A light interface for {@link PageToken} implementations to express that they have a page size + * that should be respected + */ +public interface HasPageSize { + int getPageSize(); +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java new file mode 100644 index 000000000..18586446c --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/LimitPageToken.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; + +/** + * A {@link PageToken} implementation that has a page size, but no start offset. This can be used to + * represent a `limit`. When updated, it returns {@link DonePageToken}. As such it should never be + * user-facing and doesn't truly paginate. + */ +public class LimitPageToken extends PageToken implements HasPageSize { + + public static final String PREFIX = "limit"; + + private final int pageSize; + + public LimitPageToken(int pageSize) { + this.pageSize = pageSize; + } + + @Override + public int getPageSize() { + return pageSize; + } + + @Override + public String toTokenString() { + return String.format("%s/%d", PREFIX, pageSize); + } + + @Override + protected PageToken updated(List<?> newData) { + return new DonePageToken(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java new file mode 100644 index 000000000..18287f85c --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/Page.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; + +/** + * An immutable page of items plus their paging cursor. The {@link PageToken} here can be used to + * continue the listing operation that generated the `items`. + */ +public class Page<T> { + public final PageToken pageToken; + public final List<T> items; + + public Page(PageToken pageToken, List<T> items) { + this.pageToken = pageToken; + this.items = items; + } + + /** + * Used to wrap a {@link List<T>} of items into a {@link Page <T>} when there are no more pages + */ + public static <T> Page<T> fromItems(List<T> items) { + return new Page<>(new DonePageToken(), items); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java new file mode 100644 index 000000000..2e335ccd4 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/PageToken.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; +import java.util.Objects; + +/** + * Represents a page token that can be used by operations like `listTables`. Clients that specify a + * `pageSize` (or a `pageToken`) may receive a `next-page-token` in the response, the content of + * which is a serialized PageToken. + * + * <p>By providing that in the next query's `pageToken`, the client can resume listing where they + * left off. If the client provides a `pageToken` or `pageSize` but `next-page-token` is null in the + * response, that means there is no more data to read. + */ +public abstract class PageToken { + + /** Build a new PageToken that reads everything */ + public static PageToken readEverything() { + return build(null, null); + } + + /** Build a new PageToken from an input String, without a specified page size */ + public static PageToken fromString(String token) { + return build(token, null); + } + + /** Build a new PageToken from a limit */ + public static PageToken fromLimit(Integer pageSize) { + return build(null, pageSize); + } + + /** Build a {@link PageToken} from the input string and page size */ + public static PageToken build(String token, Integer pageSize) { + if (token == null || token.isEmpty()) { + if (pageSize != null) { + return new LimitPageToken(pageSize); + } else { + return new ReadEverythingPageToken(); + } + } else { + // TODO implement, split out by the token's prefix + throw new IllegalArgumentException("Unrecognized page token: " + token); + } + } + + /** Serialize a {@link PageToken} into a string */ + public abstract String toTokenString(); + + /** + * Builds a new page token to reflect new data that's been read. If the amount of data read is + * less than the pageSize, this will return a {@link DonePageToken} + */ + protected abstract PageToken updated(List<?> newData); + + /** + * Builds a {@link Page <T>} from a {@link List<T>}. The {@link PageToken} attached to the new + * {@link Page <T>} is the same as the result of calling {@link #updated(List)} on this {@link + * PageToken}. + */ + public final <T> Page<T> buildNextPage(List<T> data) { + return new Page<T>(updated(data), data); + } + + @Override + public final boolean equals(Object o) { + if (o instanceof PageToken) { + return Objects.equals(this.toTokenString(), ((PageToken) o).toTokenString()); + } else { + return false; + } + } + + @Override + public final int hashCode() { + if (toTokenString() == null) { + return 0; + } else { + return toTokenString().hashCode(); + } + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java new file mode 100644 index 000000000..c8476c351 --- /dev/null +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/pagination/ReadEverythingPageToken.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.persistence.pagination; + +import java.util.List; + +/** + * A {@link PageToken} implementation for readers who want to read everything. The behavior when + * using this token should be the same as when reading without a token. + */ +public class ReadEverythingPageToken extends PageToken { + + public static String PREFIX = "read-everything"; + + public ReadEverythingPageToken() {} + + @Override + public String toTokenString() { + return PREFIX; + } + + @Override + protected PageToken updated(List<?> newData) { + return new DonePageToken(); + } +} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java index e949b33fe..e63ea6fed 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java @@ -37,6 +37,8 @@ import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.EntityAlreadyExistsException; import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.RetryOnConcurrencyException; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyType; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; @@ -352,46 +354,50 @@ public abstract class AbstractTransactionalPersistence implements TransactionalP /** {@inheritDoc} */ @Override @Nonnull - public List<EntityNameLookupRecord> listEntities( + public Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType) { + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken) { return runInReadTransaction( - callCtx, () -> this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId, entityType)); + callCtx, + () -> this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId, entityType, pageToken)); } /** {@inheritDoc} */ @Override @Nonnull - public List<EntityNameLookupRecord> listEntities( + public Page<EntityNameLookupRecord> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter) { + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken) { return runInReadTransaction( callCtx, () -> - this.listEntitiesInCurrentTxn(callCtx, catalogId, parentId, entityType, entityFilter)); + this.listEntitiesInCurrentTxn( + callCtx, catalogId, parentId, entityType, entityFilter, pageToken)); } /** {@inheritDoc} */ @Override @Nonnull - public <T> List<T> listEntities( + public <T> Page<T> listEntities( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer) { + @Nonnull Function<PolarisBaseEntity, T> transformer, + @Nonnull PageToken pageToken) { return runInReadTransaction( callCtx, () -> this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, limit, entityFilter, transformer)); + callCtx, catalogId, parentId, entityType, entityFilter, transformer, pageToken)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 6eb48c12e..62f526a6d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -63,6 +64,8 @@ import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyMappingUtil; @@ -677,37 +680,41 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { } /** - * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType)} + * See {@link #listEntities(PolarisCallContext, List, PolarisEntityType, PolarisEntitySubType, + * PageToken)} */ private @Nonnull ListEntitiesResult listEntities( @Nonnull PolarisCallContext callCtx, @Nonnull TransactionalPersistence ms, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, - @Nonnull PolarisEntitySubType entitySubType) { + @Nonnull PolarisEntitySubType entitySubType, + @Nonnull PageToken pageToken) { // first resolve again the catalogPath to that entity PolarisEntityResolver resolver = new PolarisEntityResolver(callCtx, ms, catalogPath); // return if we failed to resolve if (resolver.isFailure()) { - return new ListEntitiesResult(BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null); + return new ListEntitiesResult( + BaseResult.ReturnStatus.CATALOG_PATH_CANNOT_BE_RESOLVED, null, Optional.empty()); } // return list of active entities - List<EntityNameLookupRecord> toreturnList = + Page<EntityNameLookupRecord> resultPage = ms.listEntitiesInCurrentTxn( - callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType); + callCtx, resolver.getCatalogIdOrNull(), resolver.getParentId(), entityType, pageToken); // prune the returned list with only entities matching the entity subtype if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) { - toreturnList = - toreturnList.stream() - .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) - .collect(Collectors.toList()); + resultPage = + pageToken.buildNextPage( + resultPage.items.stream() + .filter(rec -> rec.getSubTypeCode() == entitySubType.getCode()) + .collect(Collectors.toList())); } // done - return new ListEntitiesResult(toreturnList); + return ListEntitiesResult.fromPage(resultPage); } /** {@inheritDoc} */ @@ -716,13 +723,15 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { @Nonnull PolarisCallContext callCtx, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, - @Nonnull PolarisEntitySubType entitySubType) { + @Nonnull PolarisEntitySubType entitySubType, + @Nonnull PageToken pageToken) { // get meta store we should be using TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); // run operation in a read transaction return ms.runInReadTransaction( - callCtx, () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType)); + callCtx, + () -> listEntities(callCtx, ms, catalogPath, entityType, entitySubType, pageToken)); } /** {@link #createPrincipal(PolarisCallContext, PolarisBaseEntity)} */ @@ -1359,13 +1368,14 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { // get the list of catalog roles, at most 2 List<PolarisBaseEntity> catalogRoles = ms.listEntitiesInCurrentTxn( - callCtx, - catalogId, - catalogId, - PolarisEntityType.CATALOG_ROLE, - 2, - entity -> true, - Function.identity()); + callCtx, + catalogId, + catalogId, + PolarisEntityType.CATALOG_ROLE, + entity -> true, + Function.identity(), + PageToken.fromLimit(2)) + .items; // if we have 2, we cannot drop the catalog. If only one left, better be the admin role if (catalogRoles.size() > 1) { @@ -1919,21 +1929,20 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { () -> this.loadEntity(callCtx, ms, entityCatalogId, entityId, entityType.getCode())); } - /** Refer to {@link #loadTasks(PolarisCallContext, String, int)} */ + /** Refer to {@link #loadTasks(PolarisCallContext, String, PageToken)} */ private @Nonnull EntitiesResult loadTasks( @Nonnull PolarisCallContext callCtx, @Nonnull TransactionalPersistence ms, String executorId, - int limit) { + PageToken pageToken) { // find all available tasks - List<PolarisBaseEntity> availableTasks = + Page<PolarisBaseEntity> availableTasks = ms.listEntitiesInCurrentTxn( callCtx, PolarisEntityConstants.getRootEntityId(), PolarisEntityConstants.getRootEntityId(), PolarisEntityType.TASK, - limit, entity -> { PolarisObjectMapperUtil.TaskExecutionState taskState = PolarisObjectMapperUtil.parseTaskState(entity); @@ -1948,10 +1957,11 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { || taskState.executor == null || callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout; }, - Function.identity()); + Function.identity(), + pageToken); List<PolarisBaseEntity> loadedTasks = new ArrayList<>(); - availableTasks.forEach( + availableTasks.items.forEach( task -> { // Make a copy to avoid mutating someone else's reference. // TODO: Refactor into immutable/Builder pattern. @@ -1982,14 +1992,14 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { result.getReturnStatus(), result.getExtraInformation()); } }); - return new EntitiesResult(loadedTasks); + return EntitiesResult.fromPage(Page.fromItems(loadedTasks)); } @Override public @Nonnull EntitiesResult loadTasks( - @Nonnull PolarisCallContext callCtx, String executorId, int limit) { + @Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) { TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); - return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, limit)); + return ms.runInTransaction(callCtx, () -> this.loadTasks(callCtx, ms, executorId, pageToken)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java index 2057991db..1c58334d5 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java @@ -36,6 +36,8 @@ import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.IntegrationPersistence; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.TransactionalPolicyMappingPersistence; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -201,31 +203,33 @@ public interface TransactionalPersistence /** See {@link org.apache.polaris.core.persistence.BasePersistence#listEntities} */ @Nonnull - List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType); + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken); /** See {@link org.apache.polaris.core.persistence.BasePersistence#listEntities} */ @Nonnull - List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter); + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken); /** See {@link org.apache.polaris.core.persistence.BasePersistence#listEntities} */ @Nonnull - <T> List<T> listEntitiesInCurrentTxn( + <T> Page<T> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer); + @Nonnull Function<PolarisBaseEntity, T> transformer, + @Nonnull PageToken pageToken); /** * See {@link org.apache.polaris.core.persistence.BasePersistence#lookupEntityGrantRecordsVersion} diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java index 39ce364d3..304ac0ce9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java @@ -26,6 +26,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.entity.PolarisBaseEntity; @@ -38,6 +39,9 @@ import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.entity.PolarisPrincipalSecrets; import org.apache.polaris.core.persistence.BaseMetaStoreManager; import org.apache.polaris.core.persistence.PrincipalSecretsGenerator; +import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo; import org.apache.polaris.core.storage.PolarisStorageIntegration; @@ -301,29 +305,30 @@ public class TreeMapTransactionalPersistenceImpl extends AbstractTransactionalPe /** {@inheritDoc} */ @Override - public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, - @Nonnull PolarisEntityType entityType) { + @Nonnull PolarisEntityType entityType, + @Nonnull PageToken pageToken) { return this.listEntitiesInCurrentTxn( - callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue()); + callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken); } @Override - public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn( + public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - @Nonnull Predicate<PolarisBaseEntity> entityFilter) { + @Nonnull Predicate<PolarisBaseEntity> entityFilter, + @Nonnull PageToken pageToken) { // full range scan under the parent for that type return this.listEntitiesInCurrentTxn( callCtx, catalogId, parentId, entityType, - Integer.MAX_VALUE, entityFilter, entity -> new EntityNameLookupRecord( @@ -332,31 +337,36 @@ public class TreeMapTransactionalPersistenceImpl extends AbstractTransactionalPe entity.getParentId(), entity.getName(), entity.getTypeCode(), - entity.getSubTypeCode())); + entity.getSubTypeCode()), + pageToken); } @Override - public @Nonnull <T> List<T> listEntitiesInCurrentTxn( + public @Nonnull <T> Page<T> listEntitiesInCurrentTxn( @Nonnull PolarisCallContext callCtx, long catalogId, long parentId, @Nonnull PolarisEntityType entityType, - int limit, @Nonnull Predicate<PolarisBaseEntity> entityFilter, - @Nonnull Function<PolarisBaseEntity, T> transformer) { + @Nonnull Function<PolarisBaseEntity, T> transformer, + @Nonnull PageToken pageToken) { // full range scan under the parent for that type - return this.store - .getSliceEntitiesActive() - .readRange(this.store.buildPrefixKeyComposite(catalogId, parentId, entityType.getCode())) - .stream() - .map( - nameRecord -> - this.lookupEntityInCurrentTxn( - callCtx, catalogId, nameRecord.getId(), entityType.getCode())) - .filter(entityFilter) - .limit(limit) - .map(transformer) - .collect(Collectors.toList()); + Stream<PolarisBaseEntity> data = + this.store + .getSliceEntitiesActive() + .readRange( + this.store.buildPrefixKeyComposite(catalogId, parentId, entityType.getCode())) + .stream() + .map( + nameRecord -> + this.lookupEntityInCurrentTxn( + callCtx, catalogId, nameRecord.getId(), entityType.getCode())) + .filter(entityFilter); + if (pageToken instanceof HasPageSize) { + data = data.limit(((HasPageSize) pageToken).getPageSize()); + } + + return Page.fromItems(data.map(transformer).collect(Collectors.toList())); } /** {@inheritDoc} */ diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index f58313712..0f834bc76 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -43,6 +43,7 @@ import org.apache.polaris.core.entity.PolarisEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.BeforeEach; @@ -128,7 +129,8 @@ public abstract class BasePolarisMetaStoreManagerTest { polarisTestMetaStoreManager.polarisCallContext, null, PolarisEntityType.TASK, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities(); Assertions.assertThat(listedEntities) .isNotNull() @@ -307,7 +309,7 @@ public abstract class BasePolarisMetaStoreManagerTest { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; PolarisCallContext callCtx = polarisTestMetaStoreManager.polarisCallContext; List<PolarisBaseEntity> taskList = - metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(5)).getEntities(); Assertions.assertThat(taskList) .isNotNull() .isNotEmpty() @@ -327,7 +329,7 @@ public abstract class BasePolarisMetaStoreManagerTest { // grab a second round of tasks. Assert that none of the original 5 are in the list List<PolarisBaseEntity> newTaskList = - metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(5)).getEntities(); Assertions.assertThat(newTaskList) .isNotNull() .isNotEmpty() @@ -341,7 +343,7 @@ public abstract class BasePolarisMetaStoreManagerTest { // only 10 tasks are unassigned. Requesting 20, we should only receive those 10 List<PolarisBaseEntity> lastTen = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); Assertions.assertThat(lastTen) .isNotNull() @@ -355,7 +357,7 @@ public abstract class BasePolarisMetaStoreManagerTest { .collect(Collectors.toSet()); List<PolarisBaseEntity> emtpyList = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); Assertions.assertThat(emtpyList).isNotNull().isEmpty(); @@ -363,7 +365,7 @@ public abstract class BasePolarisMetaStoreManagerTest { // all the tasks are unassigned. Fetch them all List<PolarisBaseEntity> allTasks = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); Assertions.assertThat(allTasks) .isNotNull() @@ -378,7 +380,7 @@ public abstract class BasePolarisMetaStoreManagerTest { timeSource.add(Duration.ofMinutes(10)); List<PolarisBaseEntity> finalList = - metaStoreManager.loadTasks(callCtx, executorId, 20).getEntities(); + metaStoreManager.loadTasks(callCtx, executorId, PageToken.fromLimit(20)).getEntities(); Assertions.assertThat(finalList).isNotNull().isEmpty(); } @@ -406,7 +408,10 @@ public abstract class BasePolarisMetaStoreManagerTest { do { retry = false; try { - taskList = metaStoreManager.loadTasks(callCtx, executorId, 5).getEntities(); + taskList = + metaStoreManager + .loadTasks(callCtx, executorId, PageToken.fromLimit(5)) + .getEntities(); taskList.stream().map(PolarisBaseEntity::getName).forEach(taskNames::add); } catch (RetryOnConcurrencyException e) { retry = true; diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java index da72308a0..8cddecae3 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java @@ -51,6 +51,7 @@ import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult; import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult; import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.policy.PolarisPolicyMappingRecord; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -766,7 +767,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, path, PolarisEntityType.NAMESPACE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); if (children.isEmpty() && entity.getType() == PolarisEntityType.NAMESPACE) { @@ -776,7 +778,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, path, PolarisEntityType.TABLE_LIKE, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + PageToken.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); } else if (children.isEmpty()) { @@ -786,7 +789,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, path, PolarisEntityType.CATALOG_ROLE, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + PageToken.readEverything()) .getEntities(); Assertions.assertThat(children).isNotNull(); // if only one left, it can be dropped. @@ -1555,7 +1559,12 @@ public class PolarisTestMetaStoreManager { // list the entities under the specified path List<EntityNameLookupRecord> result = polarisMetaStoreManager - .listEntities(this.polarisCallContext, path, entityType, entitySubType) + .listEntities( + this.polarisCallContext, + path, + entityType, + entitySubType, + PageToken.readEverything()) .getEntities(); Assertions.assertThat(result).isNotNull(); @@ -1872,7 +1881,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, null, PolarisEntityType.PRINCIPAL, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities(); // ensure not null, one element only @@ -1898,7 +1908,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, null, PolarisEntityType.PRINCIPAL_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities(); // ensure not null, one element only @@ -2636,7 +2647,8 @@ public class PolarisTestMetaStoreManager { this.polarisCallContext, null, PolarisEntityType.PRINCIPAL, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities(); // ensure not null, one element only diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index d81edd1d4..5837f8822 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -97,6 +97,7 @@ import org.apache.polaris.core.persistence.cache.InMemoryEntityCache; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.secrets.UserSecretsManagerFactory; @@ -171,6 +172,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { "true", "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", "[\"FILE\"]", + "polaris.features.defaults.\"LIST_PAGINATION_ENABLED\"", + "true", "polaris.event-listener.type", "test"); } @@ -1536,7 +1539,9 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { .as("Table should not exist after drop") .rejects(TABLE); List<PolarisBaseEntity> tasks = - metaStoreManager.loadTasks(polarisContext, "testExecutor", 1).getEntities(); + metaStoreManager + .loadTasks(polarisContext, "testExecutor", PageToken.fromLimit(1)) + .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); EnumMap<StorageAccessProperty, String> credentials = @@ -1745,7 +1750,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { TaskEntity taskEntity = TaskEntity.of( metaStoreManager - .loadTasks(callContext.getPolarisCallContext(), "testExecutor", 1) + .loadTasks( + callContext.getPolarisCallContext(), "testExecutor", PageToken.fromLimit(1)) .getEntities() .getFirst()); Map<String, String> properties = taskEntity.getInternalPropertiesAsMap(); diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java index 1c1f1441e..2bb53b40c 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java @@ -54,6 +54,7 @@ import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.persistence.BasePersistence; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.storage.PolarisStorageActions; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.task.BatchFileCleanupTaskHandler; @@ -153,7 +154,7 @@ class TableCleanupTaskHandlerTest { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", 2) + .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(2)) .getEntities()) .hasSize(2) .satisfiesExactlyInAnyOrder( @@ -233,7 +234,7 @@ class TableCleanupTaskHandlerTest { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", 5) + .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) .getEntities()) .hasSize(2); } @@ -294,10 +295,10 @@ class TableCleanupTaskHandlerTest { assertThat( metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", 5) + .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) .getEntities()) .hasSize(4) - .satisfiesExactly( + .satisfiesExactlyInAnyOrder( taskEntity -> assertThat(taskEntity) .returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode) @@ -414,7 +415,7 @@ class TableCleanupTaskHandlerTest { List<PolarisBaseEntity> entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "test", 5) + .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(5)) .getEntities(); List<PolarisBaseEntity> manifestCleanupTasks = @@ -573,7 +574,7 @@ class TableCleanupTaskHandlerTest { List<PolarisBaseEntity> entities = metaStoreManagerFactory .getOrCreateMetaStoreManager(callContext.getRealmContext()) - .loadTasks(callContext.getPolarisCallContext(), "test", 6) + .loadTasks(callContext.getPolarisCallContext(), "test", PageToken.fromLimit(6)) .getEntities(); List<PolarisBaseEntity> manifestCleanupTasks = diff --git a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java index b814d902f..ef6a754e9 100644 --- a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java +++ b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java @@ -95,6 +95,7 @@ import org.apache.polaris.core.persistence.dao.entity.CreatePrincipalResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.ResolverPath; import org.apache.polaris.core.persistence.resolver.ResolverStatus; @@ -885,7 +886,8 @@ public class PolarisAdminService { getCurrentPolarisContext(), null, PolarisEntityType.CATALOG, - PolarisEntitySubType.ANY_SUBTYPE) + PolarisEntitySubType.ANY_SUBTYPE, + PageToken.readEverything()) .getEntities() .stream() .map( @@ -1051,7 +1053,8 @@ public class PolarisAdminService { getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities() .stream() .map( @@ -1160,7 +1163,8 @@ public class PolarisAdminService { getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities() .stream() .map( @@ -1288,7 +1292,8 @@ public class PolarisAdminService { getCurrentPolarisContext(), PolarisEntity.toCoreList(List.of(catalogEntity)), PolarisEntityType.CATALOG_ROLE, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities() .stream() .map( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java index b2fb31f67..79adeaee8 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalog.java @@ -37,6 +37,7 @@ import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,7 +176,8 @@ public class GenericTableCatalog { this.callContext.getPolarisCallContext(), PolarisEntity.toCoreList(catalogPath), PolarisEntityType.TABLE_LIKE, - PolarisEntitySubType.GENERIC_TABLE) + PolarisEntitySubType.GENERIC_TABLE, + PageToken.readEverything()) .getEntities()); return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index c66def885..7c02a6154 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -109,6 +109,8 @@ import org.apache.polaris.core.persistence.dao.entity.BaseResult; import org.apache.polaris.core.persistence.dao.entity.DropEntityResult; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult; +import org.apache.polaris.core.persistence.pagination.Page; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.persistence.resolver.ResolverPath; @@ -500,12 +502,20 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<TableIdentifier> listTables(Namespace namespace) { + return listTables(namespace, PageToken.readEverything()).items; + } + + public Page<TableIdentifier> listTables(Namespace namespace, String pageToken, Integer pageSize) { + return listTables(namespace, buildPageToken(pageToken, pageSize)); + } + + private Page<TableIdentifier> listTables(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list tables for namespace. Namespace does not exist: '%s'", namespace); } - return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace); + return listTableLike(PolarisEntitySubType.ICEBERG_TABLE, namespace, pageToken); } @Override @@ -815,22 +825,36 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return listNamespaces(namespace, PageToken.readEverything()).items; + } + + public Page<Namespace> listNamespaces(Namespace namespace, String pageToken, Integer pageSize) { + return listNamespaces(namespace, buildPageToken(pageToken, pageSize)); + } + + private Page<Namespace> listNamespaces(Namespace namespace, PageToken pageToken) + throws NoSuchNamespaceException { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace); } List<PolarisEntity> catalogPath = resolvedEntities.getRawFullPath(); + ListEntitiesResult listResult = + getMetaStoreManager() + .listEntities( + getCurrentPolarisContext(), + PolarisEntity.toCoreList(catalogPath), + PolarisEntityType.NAMESPACE, + PolarisEntitySubType.NULL_SUBTYPE, + pageToken); List<PolarisEntity.NameAndId> entities = - PolarisEntity.toNameAndIdList( - getMetaStoreManager() - .listEntities( - getCurrentPolarisContext(), - PolarisEntity.toCoreList(catalogPath), - PolarisEntityType.NAMESPACE, - PolarisEntitySubType.NULL_SUBTYPE) - .getEntities()); - return PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List<Namespace> namespaces = PolarisCatalogHelpers.nameAndIdToNamespaces(catalogPath, entities); + return listResult + .getPageToken() + .map(token -> new Page<>(token, namespaces)) + .orElseGet(() -> Page.fromItems(namespaces)); } @Override @@ -842,12 +866,20 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog @Override public List<TableIdentifier> listViews(Namespace namespace) { + return listViews(namespace, PageToken.readEverything()).items; + } + + public Page<TableIdentifier> listViews(Namespace namespace, String pageToken, Integer pageSize) { + return listViews(namespace, buildPageToken(pageToken, pageSize)); + } + + private Page<TableIdentifier> listViews(Namespace namespace, PageToken pageToken) { if (!namespaceExists(namespace)) { throw new NoSuchNamespaceException( "Cannot list views for namespace. Namespace does not exist: '%s'", namespace); } - return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace); + return listTableLike(PolarisEntitySubType.ICEBERG_VIEW, namespace, pageToken); } @Override @@ -1074,7 +1106,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog callContext.getPolarisCallContext(), parentPath.stream().map(PolarisEntity::toCore).collect(Collectors.toList()), PolarisEntityType.NAMESPACE, - PolarisEntitySubType.ANY_SUBTYPE); + PolarisEntitySubType.ANY_SUBTYPE, + PageToken.readEverything()); if (!siblingNamespacesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list namespaces"); @@ -1099,7 +1132,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog .map(PolarisEntity::toCore) .collect(Collectors.toList()), PolarisEntityType.TABLE_LIKE, - PolarisEntitySubType.ANY_SUBTYPE); + PolarisEntitySubType.ANY_SUBTYPE, + PageToken.readEverything()); if (!siblingTablesResult.isSuccess()) { throw new IllegalStateException( "Unable to resolve siblings entities to validate location - could not list tables"); @@ -2458,7 +2492,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog } } - private List<TableIdentifier> listTableLike(PolarisEntitySubType subType, Namespace namespace) { + private Page<TableIdentifier> listTableLike( + PolarisEntitySubType subType, Namespace namespace, PageToken pageToken) { PolarisResolvedPathWrapper resolvedEntities = resolvedEntityView.getResolvedPath(namespace); if (resolvedEntities == null) { // Illegal state because the namespace should've already been in the static resolution set. @@ -2467,16 +2502,23 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog } List<PolarisEntity> catalogPath = resolvedEntities.getRawFullPath(); + ListEntitiesResult listResult = + getMetaStoreManager() + .listEntities( + getCurrentPolarisContext(), + PolarisEntity.toCoreList(catalogPath), + PolarisEntityType.TABLE_LIKE, + subType, + pageToken); List<PolarisEntity.NameAndId> entities = - PolarisEntity.toNameAndIdList( - getMetaStoreManager() - .listEntities( - getCurrentPolarisContext(), - PolarisEntity.toCoreList(catalogPath), - PolarisEntityType.TABLE_LIKE, - subType) - .getEntities()); - return PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); + PolarisEntity.toNameAndIdList(listResult.getEntities()); + List<TableIdentifier> identifiers = + PolarisCatalogHelpers.nameAndIdToTableIdentifiers(catalogPath, entities); + + return listResult + .getPageToken() + .map(token -> new Page<>(token, identifiers)) + .orElseGet(() -> Page.fromItems(identifiers)); } /** @@ -2524,4 +2566,22 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog .getConfiguration( callContext.getPolarisCallContext(), FeatureConfiguration.MAX_METADATA_REFRESH_RETRIES); } + + /** Build a {@link PageToken} from a string and page size. */ + private PageToken buildPageToken(@Nullable String tokenString, @Nullable Integer pageSize) { + + boolean paginationEnabled = + callContext + .getPolarisCallContext() + .getConfigurationStore() + .getConfiguration( + callContext.getPolarisCallContext(), + catalogEntity, + FeatureConfiguration.LIST_PAGINATION_ENABLED); + if (!paginationEnabled) { + return PageToken.readEverything(); + } else { + return PageToken.build(tokenString, pageSize); + } + } } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index 5baffa395..38026c0fa 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -220,7 +220,10 @@ public class IcebergCatalogAdapter securityContext, prefix, catalog -> - Response.ok(catalog.listNamespaces(namespaceOptional.orElse(Namespace.of()))).build()); + Response.ok( + catalog.listNamespaces( + namespaceOptional.orElse(Namespace.of()), pageToken, pageSize)) + .build()); } @Override @@ -356,7 +359,9 @@ public class IcebergCatalogAdapter SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); return withCatalog( - securityContext, prefix, catalog -> Response.ok(catalog.listTables(ns)).build()); + securityContext, + prefix, + catalog -> Response.ok(catalog.listTables(ns, pageToken, pageSize)).build()); } @Override @@ -525,7 +530,9 @@ public class IcebergCatalogAdapter SecurityContext securityContext) { Namespace ns = decodeNamespace(namespace); return withCatalog( - securityContext, prefix, catalog -> Response.ok(catalog.listViews(ns)).build()); + securityContext, + prefix, + catalog -> Response.ok(catalog.listViews(ns, pageToken, pageSize)).build()); } @Override diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index dbea9f4d7..158732c32 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -90,6 +90,7 @@ import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.TransactionWorkspaceMetaStoreManager; import org.apache.polaris.core.persistence.dao.entity.EntitiesResult; import org.apache.polaris.core.persistence.dao.entity.EntityWithPath; +import org.apache.polaris.core.persistence.pagination.Page; import org.apache.polaris.core.secrets.UserSecretsManager; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.PolarisStorageActions; @@ -169,6 +170,23 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab return isCreate; } + public ListNamespacesResponse listNamespaces( + Namespace parent, String pageToken, Integer pageSize) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_NAMESPACES; + authorizeBasicNamespaceOperationOrThrow(op, parent); + + if (baseCatalog instanceof IcebergCatalog polarisCatalog) { + Page<Namespace> results = polarisCatalog.listNamespaces(parent, pageToken, pageSize); + return ListNamespacesResponse.builder() + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) + .build(); + } else { + return CatalogHandlers.listNamespaces( + namespaceCatalog, parent, pageToken, String.valueOf(pageSize)); + } + } + private UserSecretsManager getUserSecretsManager() { return userSecretsManager; } @@ -304,6 +322,22 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab return CatalogHandlers.updateNamespaceProperties(namespaceCatalog, namespace, request); } + public ListTablesResponse listTables(Namespace namespace, String pageToken, Integer pageSize) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; + authorizeBasicNamespaceOperationOrThrow(op, namespace); + + if (baseCatalog instanceof IcebergCatalog polarisCatalog) { + Page<TableIdentifier> results = polarisCatalog.listTables(namespace, pageToken, pageSize); + return ListTablesResponse.builder() + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) + .build(); + } else { + return CatalogHandlers.listTables( + baseCatalog, namespace, pageToken, String.valueOf(pageSize)); + } + } + public ListTablesResponse listTables(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_TABLES; authorizeBasicNamespaceOperationOrThrow(op, namespace); @@ -942,6 +976,25 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab } } + public ListTablesResponse listViews(Namespace namespace, String pageToken, Integer pageSize) { + PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; + authorizeBasicNamespaceOperationOrThrow(op, namespace); + + if (baseCatalog instanceof IcebergCatalog polarisCatalog) { + Page<TableIdentifier> results = polarisCatalog.listViews(namespace, pageToken, pageSize); + return ListTablesResponse.builder() + .addAll(results.items) + .nextPageToken(results.pageToken.toTokenString()) + .build(); + } else if (baseCatalog instanceof ViewCatalog viewCatalog) { + return CatalogHandlers.listViews(viewCatalog, namespace, pageToken, String.valueOf(pageSize)); + } else { + throw new BadRequestException( + "Unsupported operation: listViews with baseCatalog type: %s", + baseCatalog.getClass().getName()); + } + } + public ListTablesResponse listViews(Namespace namespace) { PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LIST_VIEWS; authorizeBasicNamespaceOperationOrThrow(op, namespace); diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java index 066d513a3..e0edebfc6 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java @@ -50,6 +50,7 @@ import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper; import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException; import org.apache.polaris.core.persistence.dao.entity.EntityResult; import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView; import org.apache.polaris.core.policy.PolicyEntity; import org.apache.polaris.core.policy.PolicyType; @@ -167,7 +168,8 @@ public class PolicyCatalog { callContext.getPolarisCallContext(), PolarisEntity.toCoreList(catalogPath), PolarisEntityType.POLICY, - PolarisEntitySubType.NULL_SUBTYPE) + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) .getEntities() .stream() .map( diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index a6854703d..30afc5c09 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -46,6 +46,7 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.*; +import org.apache.polaris.core.persistence.pagination.PageToken; import org.apache.polaris.service.TestServices; import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; @@ -184,7 +185,7 @@ public class FileIOFactoryTest { testServices .metaStoreManagerFactory() .getOrCreateMetaStoreManager(realmContext) - .loadTasks(callContext.getPolarisCallContext(), "testExecutor", 1) + .loadTasks(callContext.getPolarisCallContext(), "testExecutor", PageToken.fromLimit(1)) .getEntities(); Assertions.assertThat(tasks).hasSize(1); TaskEntity taskEntity = TaskEntity.of(tasks.get(0)); diff --git a/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java new file mode 100644 index 000000000..97e52fb84 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/persistence/pagination/PageTokenTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.persistence.pagination; + +import org.apache.polaris.core.persistence.pagination.DonePageToken; +import org.apache.polaris.core.persistence.pagination.HasPageSize; +import org.apache.polaris.core.persistence.pagination.PageToken; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PageTokenTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PageTokenTest.class); + + @Test + void testDoneToken() { + Assertions.assertThat(new DonePageToken()).doesNotReturn(null, PageToken::toString); + Assertions.assertThat(new DonePageToken()).returns(null, PageToken::toTokenString); + Assertions.assertThat(new DonePageToken()).isEqualTo(new DonePageToken()); + Assertions.assertThat(new DonePageToken().hashCode()).isEqualTo(new DonePageToken().hashCode()); + } + + @Test + void testReadEverythingPageToken() { + PageToken token = PageToken.readEverything(); + + Assertions.assertThat(token.toString()).isNotNull(); + Assertions.assertThat(token.toTokenString()).isNotNull(); + Assertions.assertThat(token).isNotInstanceOf(HasPageSize.class); + + Assertions.assertThat(PageToken.readEverything()).isEqualTo(PageToken.readEverything()); + } +}