This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 22eaff4fe Fix Pagination for Catalog Federation (#1849) 22eaff4fe is described below commit 22eaff4fed431715ec8efc8c749a1707c99c48e7 Author: Rulin Xing <xjdkc...@gmail.com> AuthorDate: Wed Jun 25 09:18:28 2025 -0700 Fix Pagination for Catalog Federation (#1849) Details can be found in this issue: https://github.com/apache/polaris/issues/1848 --- .../catalog/iceberg/CatalogHandlerUtils.java | 24 +- .../catalog/iceberg/IcebergCatalogAdapter.java | 5 +- .../catalog/iceberg/IcebergCatalogHandler.java | 9 +- .../catalog/iceberg/IcebergCatalogAdapterTest.java | 245 +++++++++++++++++++++ .../org/apache/polaris/service/TestServices.java | 9 +- 5 files changed, 272 insertions(+), 20 deletions(-) diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java index ae879ea5f..480aafe7a 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import jakarta.annotation.Nullable; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.lang.reflect.Field; @@ -164,12 +165,19 @@ public class CatalogHandlerUtils { } } - private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, int pageSize) { + private <T> Pair<List<T>, String> paginate( + List<T> list, @Nullable String pageToken, @Nullable Integer pageSize) { + if (pageToken == null) { + return Pair.of(list, null); + } + int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken); if (pageStart >= list.size()) { return Pair.of(Collections.emptyList(), null); } + // if pageSize is null, return the rest of the list + pageSize = pageSize == null ? list.size() : pageSize; int end = Math.min(pageStart + pageSize, list.size()); List<T> subList = list.subList(pageStart, end); String nextPageToken = end >= list.size() ? null : String.valueOf(end); @@ -189,7 +197,7 @@ public class CatalogHandlerUtils { } public ListNamespacesResponse listNamespaces( - SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) { + SupportsNamespaces catalog, Namespace parent, String pageToken, Integer pageSize) { List<Namespace> results; if (parent.isEmpty()) { @@ -198,7 +206,7 @@ public class CatalogHandlerUtils { results = catalog.listNamespaces(parent); } - Pair<List<Namespace>, String> page = paginate(results, pageToken, Integer.parseInt(pageSize)); + Pair<List<Namespace>, String> page = paginate(results, pageToken, pageSize); return ListNamespacesResponse.builder() .addAll(page.first()) @@ -269,11 +277,10 @@ public class CatalogHandlerUtils { } public ListTablesResponse listTables( - Catalog catalog, Namespace namespace, String pageToken, String pageSize) { + Catalog catalog, Namespace namespace, String pageToken, Integer pageSize) { List<TableIdentifier> results = catalog.listTables(namespace); - Pair<List<TableIdentifier>, String> page = - paginate(results, pageToken, Integer.parseInt(pageSize)); + Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize); return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); } @@ -725,11 +732,10 @@ public class CatalogHandlerUtils { } public ListTablesResponse listViews( - ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) { + ViewCatalog catalog, Namespace namespace, String pageToken, Integer pageSize) { List<TableIdentifier> results = catalog.listViews(namespace); - Pair<List<TableIdentifier>, String> page = - paginate(results, pageToken, Integer.parseInt(pageSize)); + Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize); return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build(); } diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java index c28fd491a..7218da0f3 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java @@ -21,6 +21,7 @@ package org.apache.polaris.service.catalog.iceberg; import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; import static org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation.validateIcebergProperties; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -189,8 +190,8 @@ public class IcebergCatalogAdapter } } - private IcebergCatalogHandler newHandlerWrapper( - SecurityContext securityContext, String catalogName) { + @VisibleForTesting + IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String catalogName) { validatePrincipal(securityContext); return new IcebergCatalogHandler( diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index c06e9d98d..b39e24822 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -190,8 +190,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return catalogHandlerUtils.listNamespaces( - namespaceCatalog, parent, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, pageToken, pageSize); } } @@ -351,8 +350,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else { - return catalogHandlerUtils.listTables( - baseCatalog, namespace, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, pageSize); } } @@ -1017,8 +1015,7 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .nextPageToken(results.pageToken.toTokenString()) .build(); } else if (baseCatalog instanceof ViewCatalog viewCatalog) { - return catalogHandlerUtils.listViews( - viewCatalog, namespace, pageToken, String.valueOf(pageSize)); + return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, pageSize); } else { throw new BadRequestException( "Unsupported operation: listViews with baseCatalog type: %s", diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java new file mode 100644 index 000000000..f8f948a66 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.catalog.iceberg; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; +import org.apache.polaris.core.admin.model.AuthenticationParameters; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; +import org.apache.polaris.core.admin.model.BearerAuthenticationParameters; +import org.apache.polaris.core.admin.model.CatalogProperties; +import org.apache.polaris.core.admin.model.ConnectionConfigInfo; +import org.apache.polaris.core.admin.model.CreateCatalogRequest; +import org.apache.polaris.core.admin.model.ExternalCatalog; +import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo; +import org.apache.polaris.core.admin.model.StorageConfigInfo; +import org.apache.polaris.service.TestServices; +import org.assertj.core.api.Assertions; +import org.assertj.core.util.Strings; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +public class IcebergCatalogAdapterTest { + + private static final String FEDERATED_CATALOG_NAME = "polaris-federated-catalog"; + + private TestServices testServices; + private IcebergCatalogAdapter catalogAdapter; + + @BeforeEach + public void setUp() { + // Set up test services with catalog federation enabled + testServices = + TestServices.builder().config(Map.of("ENABLE_CATALOG_FEDERATION", "true")).build(); + catalogAdapter = Mockito.spy(testServices.catalogAdapter()); + + // Prepare storage and connection configs for a federated Iceberg REST catalog + String storageLocation = "s3://my-bucket/path/to/data"; + AwsStorageConfigInfo storageConfig = + AwsStorageConfigInfo.builder() + .setRoleArn("arn:aws:iam::012345678901:role/polaris-user-role") + .setExternalId("externalId") + .setUserArn("aws::a:user:arn") + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setAllowedLocations(List.of(storageLocation, "s3://externally-owned-bucket")) + .build(); + + AuthenticationParameters authParams = + BearerAuthenticationParameters.builder() + .setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.BEARER) + .setBearerToken("xxx") + .build(); + + IcebergRestConnectionConfigInfo connectionConfig = + IcebergRestConnectionConfigInfo.builder() + .setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST) + .setAuthenticationParameters(authParams) + .setUri("http://localhost:8080/api/v1/catalogs") + .setRemoteCatalogName("remote-catalog") + .build(); + + // Register the catalog in the test environment + testServices + .catalogsApi() + .createCatalog( + new CreateCatalogRequest( + ExternalCatalog.builder() + .setName(FEDERATED_CATALOG_NAME) + .setProperties( + CatalogProperties.builder().setDefaultBaseLocation(storageLocation).build()) + .setConnectionConfigInfo(connectionConfig) + .setStorageConfigInfo(storageConfig) + .build()), + testServices.realmContext(), + testServices.securityContext()); + } + + @ParameterizedTest(name = "[{index}] initialPageToken={0}, pageSize={1}") + @MethodSource("paginationTestCases") + void testPaginationForNonIcebergCatalog(String initialPageToken, Integer pageSize) + throws IOException { + + try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) { + // Initialize and replace the default handler with one backed by in-memory catalog + inMemoryCatalog.initialize("inMemory", Map.of()); + mockCatalogAdapter(inMemoryCatalog); + + // Set up 10 entities in the catalog: 10 namespaces, 10 tables, 10 views + int entityCount = 10; + for (int i = 0; i < entityCount; ++i) { + inMemoryCatalog.createNamespace(Namespace.of("ns" + i)); + inMemoryCatalog.createTable(TableIdentifier.of("ns0", "table" + i), new Schema()); + inMemoryCatalog + .buildView(TableIdentifier.of("ns0", "view" + i)) + .withSchema(new Schema()) + .withDefaultNamespace(Namespace.of("ns0")) + .withQuery("a", "SELECT * FROM ns0.table" + i) + .create(); + } + + // Determine starting index for pagination based on the initial page token + int pageStart = + Strings.isNullOrEmpty(initialPageToken) ? 0 : Integer.parseInt(initialPageToken); + int remain = entityCount - pageStart; + + // Initial tokens for pagination + String listNamespacePageToken = initialPageToken; + String listTablesPageToken = initialPageToken; + String listViewsPageToken = initialPageToken; + + // Simulate page-by-page fetching until all entities are consumed + while (remain > 0) { + int expectedSize = + (pageSize != null && initialPageToken != null) ? Math.min(remain, pageSize) : remain; + + // Verify namespaces pagination + ListNamespacesResponse namespacesResponse = + (ListNamespacesResponse) + catalogAdapter + .listNamespaces( + FEDERATED_CATALOG_NAME, + listNamespacePageToken, + pageSize, + null, + testServices.realmContext(), + testServices.securityContext()) + .getEntity(); + Assertions.assertThat(namespacesResponse.namespaces()).hasSize(expectedSize); + listNamespacePageToken = namespacesResponse.nextPageToken(); + + // Verify tables pagination + ListTablesResponse tablesResponse = + (ListTablesResponse) + catalogAdapter + .listTables( + FEDERATED_CATALOG_NAME, + "ns0", + listTablesPageToken, + pageSize, + testServices.realmContext(), + testServices.securityContext()) + .getEntity(); + Assertions.assertThat(tablesResponse.identifiers()).hasSize(expectedSize); + listTablesPageToken = tablesResponse.nextPageToken(); + + // Verify views pagination + ListTablesResponse viewsResponse = + (ListTablesResponse) + catalogAdapter + .listViews( + FEDERATED_CATALOG_NAME, + "ns0", + listViewsPageToken, + pageSize, + testServices.realmContext(), + testServices.securityContext()) + .getEntity(); + Assertions.assertThat(viewsResponse.identifiers()).hasSize(expectedSize); + listViewsPageToken = viewsResponse.nextPageToken(); + + remain -= expectedSize; + } + } + } + + private void mockCatalogAdapter(org.apache.iceberg.catalog.Catalog catalog) { + // Override handler creation to inject in-memory catalog and suppress actual close() + Mockito.doAnswer( + invocation -> { + IcebergCatalogHandler realHandler = + (IcebergCatalogHandler) invocation.callRealMethod(); + IcebergCatalogHandler wrappedHandler = Mockito.spy(realHandler); + + // Override initializeCatalog to inject test catalog using reflection + Mockito.doAnswer( + innerInvocation -> { + for (String fieldName : + List.of("baseCatalog", "namespaceCatalog", "viewCatalog")) { + Field field = IcebergCatalogHandler.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(wrappedHandler, catalog); + } + return null; + }) + .when(wrappedHandler) + .initializeCatalog(); + + // Prevent catalog from being closed during test lifecycle + Mockito.doNothing().when(wrappedHandler).close(); + + return wrappedHandler; + }) + .when(catalogAdapter) + .newHandlerWrapper(Mockito.any(), Mockito.any()); + } + + private static Stream<Arguments> paginationTestCases() { + return Stream.of( + Arguments.of(null, null), + Arguments.of(null, 1), + Arguments.of(null, 3), + Arguments.of(null, 5), + Arguments.of(null, 10), + Arguments.of(null, 20), + Arguments.of("", null), + Arguments.of("", 1), + Arguments.of("", 3), + Arguments.of("", 5), + Arguments.of("", 10), + Arguments.of("", 20), + Arguments.of("5", null), + Arguments.of("5", 1), + Arguments.of("5", 3), + Arguments.of("5", 5), + Arguments.of("5", 10)); + } +} diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 8f63ecfb8..48f4d713b 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -73,6 +73,7 @@ public record TestServices( PolarisCatalogsApi catalogsApi, IcebergRestCatalogApi restApi, IcebergRestConfigurationApi restConfigurationApi, + IcebergCatalogAdapter catalogAdapter, PolarisConfigurationStore configurationStore, PolarisDiagnostics polarisDiagnostics, RealmEntityManagerFactory entityManagerFactory, @@ -197,7 +198,7 @@ public record TestServices( CatalogHandlerUtils catalogHandlerUtils = new CatalogHandlerUtils(callContext.getRealmContext(), configurationStore); - IcebergCatalogAdapter service = + IcebergCatalogAdapter catalogService = new IcebergCatalogAdapter( realmContext, callContext, @@ -210,8 +211,9 @@ public record TestServices( reservedProperties, catalogHandlerUtils); - IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service); - IcebergRestConfigurationApi restConfigurationApi = new IcebergRestConfigurationApi(service); + IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(catalogService); + IcebergRestConfigurationApi restConfigurationApi = + new IcebergRestConfigurationApi(catalogService); CreatePrincipalResult createdPrincipal = metaStoreManager.createPrincipal( @@ -262,6 +264,7 @@ public record TestServices( catalogsApi, restApi, restConfigurationApi, + catalogService, configurationStore, polarisDiagnostics, realmEntityManagerFactory,