This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 22eaff4fe Fix Pagination for Catalog Federation (#1849)
22eaff4fe is described below

commit 22eaff4fed431715ec8efc8c749a1707c99c48e7
Author: Rulin Xing <xjdkc...@gmail.com>
AuthorDate: Wed Jun 25 09:18:28 2025 -0700

    Fix Pagination for Catalog Federation (#1849)
    
    Details can be found in this issue: 
https://github.com/apache/polaris/issues/1848
---
 .../catalog/iceberg/CatalogHandlerUtils.java       |  24 +-
 .../catalog/iceberg/IcebergCatalogAdapter.java     |   5 +-
 .../catalog/iceberg/IcebergCatalogHandler.java     |   9 +-
 .../catalog/iceberg/IcebergCatalogAdapterTest.java | 245 +++++++++++++++++++++
 .../org/apache/polaris/service/TestServices.java   |   9 +-
 5 files changed, 272 insertions(+), 20 deletions(-)

diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
index ae879ea5f..480aafe7a 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import jakarta.annotation.Nullable;
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
 import java.lang.reflect.Field;
@@ -164,12 +165,19 @@ public class CatalogHandlerUtils {
     }
   }
 
-  private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, 
int pageSize) {
+  private <T> Pair<List<T>, String> paginate(
+      List<T> list, @Nullable String pageToken, @Nullable Integer pageSize) {
+    if (pageToken == null) {
+      return Pair.of(list, null);
+    }
+
     int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : 
Integer.parseInt(pageToken);
     if (pageStart >= list.size()) {
       return Pair.of(Collections.emptyList(), null);
     }
 
+    // if pageSize is null, return the rest of the list
+    pageSize = pageSize == null ? list.size() : pageSize;
     int end = Math.min(pageStart + pageSize, list.size());
     List<T> subList = list.subList(pageStart, end);
     String nextPageToken = end >= list.size() ? null : String.valueOf(end);
@@ -189,7 +197,7 @@ public class CatalogHandlerUtils {
   }
 
   public ListNamespacesResponse listNamespaces(
-      SupportsNamespaces catalog, Namespace parent, String pageToken, String 
pageSize) {
+      SupportsNamespaces catalog, Namespace parent, String pageToken, Integer 
pageSize) {
     List<Namespace> results;
 
     if (parent.isEmpty()) {
@@ -198,7 +206,7 @@ public class CatalogHandlerUtils {
       results = catalog.listNamespaces(parent);
     }
 
-    Pair<List<Namespace>, String> page = paginate(results, pageToken, 
Integer.parseInt(pageSize));
+    Pair<List<Namespace>, String> page = paginate(results, pageToken, 
pageSize);
 
     return ListNamespacesResponse.builder()
         .addAll(page.first())
@@ -269,11 +277,10 @@ public class CatalogHandlerUtils {
   }
 
   public ListTablesResponse listTables(
-      Catalog catalog, Namespace namespace, String pageToken, String pageSize) 
{
+      Catalog catalog, Namespace namespace, String pageToken, Integer 
pageSize) {
     List<TableIdentifier> results = catalog.listTables(namespace);
 
-    Pair<List<TableIdentifier>, String> page =
-        paginate(results, pageToken, Integer.parseInt(pageSize));
+    Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, 
pageSize);
 
     return 
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
   }
@@ -725,11 +732,10 @@ public class CatalogHandlerUtils {
   }
 
   public ListTablesResponse listViews(
-      ViewCatalog catalog, Namespace namespace, String pageToken, String 
pageSize) {
+      ViewCatalog catalog, Namespace namespace, String pageToken, Integer 
pageSize) {
     List<TableIdentifier> results = catalog.listViews(namespace);
 
-    Pair<List<TableIdentifier>, String> page =
-        paginate(results, pageToken, Integer.parseInt(pageSize));
+    Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, 
pageSize);
 
     return 
ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
   }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index c28fd491a..7218da0f3 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -21,6 +21,7 @@ package org.apache.polaris.service.catalog.iceberg;
 import static 
org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
 import static 
org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation.validateIcebergProperties;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -189,8 +190,8 @@ public class IcebergCatalogAdapter
     }
   }
 
-  private IcebergCatalogHandler newHandlerWrapper(
-      SecurityContext securityContext, String catalogName) {
+  @VisibleForTesting
+  IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, 
String catalogName) {
     validatePrincipal(securityContext);
 
     return new IcebergCatalogHandler(
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index c06e9d98d..b39e24822 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -190,8 +190,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else {
-      return catalogHandlerUtils.listNamespaces(
-          namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
+      return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, 
pageToken, pageSize);
     }
   }
 
@@ -351,8 +350,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else {
-      return catalogHandlerUtils.listTables(
-          baseCatalog, namespace, pageToken, String.valueOf(pageSize));
+      return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, 
pageSize);
     }
   }
 
@@ -1017,8 +1015,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
           .nextPageToken(results.pageToken.toTokenString())
           .build();
     } else if (baseCatalog instanceof ViewCatalog viewCatalog) {
-      return catalogHandlerUtils.listViews(
-          viewCatalog, namespace, pageToken, String.valueOf(pageSize));
+      return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, 
pageSize);
     } else {
       throw new BadRequestException(
           "Unsupported operation: listViews with baseCatalog type: %s",
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
new file mode 100644
index 000000000..f8f948a66
--- /dev/null
+++ 
b/service/common/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapterTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog.iceberg;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.iceberg.rest.responses.ListNamespacesResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.polaris.core.admin.model.AuthenticationParameters;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.ExternalCatalog;
+import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.TestServices;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Strings;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+
+public class IcebergCatalogAdapterTest {
+
+  private static final String FEDERATED_CATALOG_NAME = 
"polaris-federated-catalog";
+
+  private TestServices testServices;
+  private IcebergCatalogAdapter catalogAdapter;
+
+  @BeforeEach
+  public void setUp() {
+    // Set up test services with catalog federation enabled
+    testServices =
+        TestServices.builder().config(Map.of("ENABLE_CATALOG_FEDERATION", 
"true")).build();
+    catalogAdapter = Mockito.spy(testServices.catalogAdapter());
+
+    // Prepare storage and connection configs for a federated Iceberg REST 
catalog
+    String storageLocation = "s3://my-bucket/path/to/data";
+    AwsStorageConfigInfo storageConfig =
+        AwsStorageConfigInfo.builder()
+            .setRoleArn("arn:aws:iam::012345678901:role/polaris-user-role")
+            .setExternalId("externalId")
+            .setUserArn("aws::a:user:arn")
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setAllowedLocations(List.of(storageLocation, 
"s3://externally-owned-bucket"))
+            .build();
+
+    AuthenticationParameters authParams =
+        BearerAuthenticationParameters.builder()
+            
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.BEARER)
+            .setBearerToken("xxx")
+            .build();
+
+    IcebergRestConnectionConfigInfo connectionConfig =
+        IcebergRestConnectionConfigInfo.builder()
+            
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
+            .setAuthenticationParameters(authParams)
+            .setUri("http://localhost:8080/api/v1/catalogs";)
+            .setRemoteCatalogName("remote-catalog")
+            .build();
+
+    // Register the catalog in the test environment
+    testServices
+        .catalogsApi()
+        .createCatalog(
+            new CreateCatalogRequest(
+                ExternalCatalog.builder()
+                    .setName(FEDERATED_CATALOG_NAME)
+                    .setProperties(
+                        
CatalogProperties.builder().setDefaultBaseLocation(storageLocation).build())
+                    .setConnectionConfigInfo(connectionConfig)
+                    .setStorageConfigInfo(storageConfig)
+                    .build()),
+            testServices.realmContext(),
+            testServices.securityContext());
+  }
+
+  @ParameterizedTest(name = "[{index}] initialPageToken={0}, pageSize={1}")
+  @MethodSource("paginationTestCases")
+  void testPaginationForNonIcebergCatalog(String initialPageToken, Integer 
pageSize)
+      throws IOException {
+
+    try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) {
+      // Initialize and replace the default handler with one backed by 
in-memory catalog
+      inMemoryCatalog.initialize("inMemory", Map.of());
+      mockCatalogAdapter(inMemoryCatalog);
+
+      // Set up 10 entities in the catalog: 10 namespaces, 10 tables, 10 views
+      int entityCount = 10;
+      for (int i = 0; i < entityCount; ++i) {
+        inMemoryCatalog.createNamespace(Namespace.of("ns" + i));
+        inMemoryCatalog.createTable(TableIdentifier.of("ns0", "table" + i), 
new Schema());
+        inMemoryCatalog
+            .buildView(TableIdentifier.of("ns0", "view" + i))
+            .withSchema(new Schema())
+            .withDefaultNamespace(Namespace.of("ns0"))
+            .withQuery("a", "SELECT * FROM ns0.table" + i)
+            .create();
+      }
+
+      // Determine starting index for pagination based on the initial page 
token
+      int pageStart =
+          Strings.isNullOrEmpty(initialPageToken) ? 0 : 
Integer.parseInt(initialPageToken);
+      int remain = entityCount - pageStart;
+
+      // Initial tokens for pagination
+      String listNamespacePageToken = initialPageToken;
+      String listTablesPageToken = initialPageToken;
+      String listViewsPageToken = initialPageToken;
+
+      // Simulate page-by-page fetching until all entities are consumed
+      while (remain > 0) {
+        int expectedSize =
+            (pageSize != null && initialPageToken != null) ? Math.min(remain, 
pageSize) : remain;
+
+        // Verify namespaces pagination
+        ListNamespacesResponse namespacesResponse =
+            (ListNamespacesResponse)
+                catalogAdapter
+                    .listNamespaces(
+                        FEDERATED_CATALOG_NAME,
+                        listNamespacePageToken,
+                        pageSize,
+                        null,
+                        testServices.realmContext(),
+                        testServices.securityContext())
+                    .getEntity();
+        
Assertions.assertThat(namespacesResponse.namespaces()).hasSize(expectedSize);
+        listNamespacePageToken = namespacesResponse.nextPageToken();
+
+        // Verify tables pagination
+        ListTablesResponse tablesResponse =
+            (ListTablesResponse)
+                catalogAdapter
+                    .listTables(
+                        FEDERATED_CATALOG_NAME,
+                        "ns0",
+                        listTablesPageToken,
+                        pageSize,
+                        testServices.realmContext(),
+                        testServices.securityContext())
+                    .getEntity();
+        
Assertions.assertThat(tablesResponse.identifiers()).hasSize(expectedSize);
+        listTablesPageToken = tablesResponse.nextPageToken();
+
+        // Verify views pagination
+        ListTablesResponse viewsResponse =
+            (ListTablesResponse)
+                catalogAdapter
+                    .listViews(
+                        FEDERATED_CATALOG_NAME,
+                        "ns0",
+                        listViewsPageToken,
+                        pageSize,
+                        testServices.realmContext(),
+                        testServices.securityContext())
+                    .getEntity();
+        
Assertions.assertThat(viewsResponse.identifiers()).hasSize(expectedSize);
+        listViewsPageToken = viewsResponse.nextPageToken();
+
+        remain -= expectedSize;
+      }
+    }
+  }
+
+  private void mockCatalogAdapter(org.apache.iceberg.catalog.Catalog catalog) {
+    // Override handler creation to inject in-memory catalog and suppress 
actual close()
+    Mockito.doAnswer(
+            invocation -> {
+              IcebergCatalogHandler realHandler =
+                  (IcebergCatalogHandler) invocation.callRealMethod();
+              IcebergCatalogHandler wrappedHandler = Mockito.spy(realHandler);
+
+              // Override initializeCatalog to inject test catalog using 
reflection
+              Mockito.doAnswer(
+                      innerInvocation -> {
+                        for (String fieldName :
+                            List.of("baseCatalog", "namespaceCatalog", 
"viewCatalog")) {
+                          Field field = 
IcebergCatalogHandler.class.getDeclaredField(fieldName);
+                          field.setAccessible(true);
+                          field.set(wrappedHandler, catalog);
+                        }
+                        return null;
+                      })
+                  .when(wrappedHandler)
+                  .initializeCatalog();
+
+              // Prevent catalog from being closed during test lifecycle
+              Mockito.doNothing().when(wrappedHandler).close();
+
+              return wrappedHandler;
+            })
+        .when(catalogAdapter)
+        .newHandlerWrapper(Mockito.any(), Mockito.any());
+  }
+
+  private static Stream<Arguments> paginationTestCases() {
+    return Stream.of(
+        Arguments.of(null, null),
+        Arguments.of(null, 1),
+        Arguments.of(null, 3),
+        Arguments.of(null, 5),
+        Arguments.of(null, 10),
+        Arguments.of(null, 20),
+        Arguments.of("", null),
+        Arguments.of("", 1),
+        Arguments.of("", 3),
+        Arguments.of("", 5),
+        Arguments.of("", 10),
+        Arguments.of("", 20),
+        Arguments.of("5", null),
+        Arguments.of("5", 1),
+        Arguments.of("5", 3),
+        Arguments.of("5", 5),
+        Arguments.of("5", 10));
+  }
+}
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 8f63ecfb8..48f4d713b 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -73,6 +73,7 @@ public record TestServices(
     PolarisCatalogsApi catalogsApi,
     IcebergRestCatalogApi restApi,
     IcebergRestConfigurationApi restConfigurationApi,
+    IcebergCatalogAdapter catalogAdapter,
     PolarisConfigurationStore configurationStore,
     PolarisDiagnostics polarisDiagnostics,
     RealmEntityManagerFactory entityManagerFactory,
@@ -197,7 +198,7 @@ public record TestServices(
       CatalogHandlerUtils catalogHandlerUtils =
           new CatalogHandlerUtils(callContext.getRealmContext(), 
configurationStore);
 
-      IcebergCatalogAdapter service =
+      IcebergCatalogAdapter catalogService =
           new IcebergCatalogAdapter(
               realmContext,
               callContext,
@@ -210,8 +211,9 @@ public record TestServices(
               reservedProperties,
               catalogHandlerUtils);
 
-      IcebergRestCatalogApi restApi = new IcebergRestCatalogApi(service);
-      IcebergRestConfigurationApi restConfigurationApi = new 
IcebergRestConfigurationApi(service);
+      IcebergRestCatalogApi restApi = new 
IcebergRestCatalogApi(catalogService);
+      IcebergRestConfigurationApi restConfigurationApi =
+          new IcebergRestConfigurationApi(catalogService);
 
       CreatePrincipalResult createdPrincipal =
           metaStoreManager.createPrincipal(
@@ -262,6 +264,7 @@ public record TestServices(
           catalogsApi,
           restApi,
           restConfigurationApi,
+          catalogService,
           configurationStore,
           polarisDiagnostics,
           realmEntityManagerFactory,

Reply via email to