This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 0a9a5d969 feat(cdi): Remove CallContext.close() (#1776) 0a9a5d969 is described below commit 0a9a5d96934d7bbe1330113cd3531776e732fef6 Author: Alexandre Dutra <adu...@users.noreply.github.com> AuthorDate: Tue Jun 3 16:37:23 2025 +0200 feat(cdi): Remove CallContext.close() (#1776) Now that every catalog created by PolarisCallContextCatalogFactory is correctly closed by IcebergCatalogAdapter, we can finally remove the CallContext.close() method and the associated cloaseables group. This simplification will hopefully pave the way to a more robust handling of request-scoped beans in task executor threads. --- .../apache/polaris/core/context/CallContext.java | 62 +--- .../storage/InMemoryStorageIntegrationTest.java | 72 ++-- .../BasePolarisMetaStoreManagerTest.java | 225 ++++++------ .../service/quarkus/config/QuarkusProducers.java | 4 - .../quarkus/auth/JWTSymmetricKeyGeneratorTest.java | 20 -- .../task/BatchFileCleanupTaskHandlerTest.java | 396 ++++++++++----------- .../task/ManifestFileCleanupTaskHandlerTest.java | 273 +++++++------- .../test/PolarisIntegrationTestFixture.java | 9 +- .../service/catalog/iceberg/IcebergCatalog.java | 1 - .../catalog/PolarisCallContextCatalogFactory.java | 2 - .../service/catalog/io/FileIOFactoryTest.java | 6 - .../org/apache/polaris/service/TestServices.java | 7 - 12 files changed, 470 insertions(+), 607 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java index 6cd56bc30..54859647d 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/context/CallContext.java @@ -18,16 +18,8 @@ */ package org.apache.polaris.core.context; -import jakarta.annotation.Nonnull; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iceberg.io.CloseableGroup; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Stores elements associated with an individual REST request such as RealmContext, caller @@ -37,15 +29,9 @@ import org.slf4j.LoggerFactory; * principal/role entities may be defined within a Realm-specific persistence layer, and the * underlying nature of the persistence layer may differ between different realms. */ -public interface CallContext extends AutoCloseable { +public interface CallContext { InheritableThreadLocal<CallContext> CURRENT_CONTEXT = new InheritableThreadLocal<>(); - // For requests that make use of a Catalog instance, this holds the instance that was - // created, scoped to the current call context. - String REQUEST_PATH_CATALOG_INSTANCE_KEY = "REQUEST_PATH_CATALOG_INSTANCE"; - - String CLOSEABLES = "closeables"; - static CallContext setCurrentContext(CallContext context) { CURRENT_CONTEXT.set(context); return context; @@ -65,7 +51,6 @@ public interface CallContext extends AutoCloseable { static CallContext of( final RealmContext realmContext, final PolarisCallContext polarisCallContext) { - Map<String, Object> map = new HashMap<>(); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -76,28 +61,14 @@ public interface CallContext extends AutoCloseable { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map<String, Object> contextVariables() { - return map; - } }; } - /** - * Copy the {@link CallContext}. {@link #contextVariables()} will be copied except for {@link - * #closeables()}. The original {@link #contextVariables()} map is untouched and {@link - * #closeables()} in the original {@link CallContext} should be closed along with the {@link - * CallContext}. - */ + /** Copy the {@link CallContext}. */ static CallContext copyOf(CallContext base) { String realmId = base.getRealmContext().getRealmIdentifier(); RealmContext realmContext = () -> realmId; PolarisCallContext polarisCallContext = PolarisCallContext.copyOf(base.getPolarisCallContext()); - Map<String, Object> contextVariables = - base.contextVariables().entrySet().stream() - .filter(e -> !e.getKey().equals(CLOSEABLES)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); return new CallContext() { @Override public RealmContext getRealmContext() { @@ -108,11 +79,6 @@ public interface CallContext extends AutoCloseable { public PolarisCallContext getPolarisCallContext() { return polarisCallContext; } - - @Override - public Map<String, Object> contextVariables() { - return contextVariables; - } }; } @@ -122,28 +88,4 @@ public interface CallContext extends AutoCloseable { * @return the inner context used for delegating services */ PolarisCallContext getPolarisCallContext(); - - Map<String, Object> contextVariables(); - - default @Nonnull CloseableGroup closeables() { - return (CloseableGroup) - contextVariables().computeIfAbsent(CLOSEABLES, key -> new CloseableGroup()); - } - - @Override - default void close() { - if (CURRENT_CONTEXT.get() == this) { - unsetCurrentContext(); - CloseableGroup closeables = closeables(); - try { - closeables.close(); - } catch (IOException e) { - Logger logger = LoggerFactory.getLogger(CallContext.class); - logger - .atWarn() - .addKeyValue("closeableGroup", closeables) - .log("Unable to close closeable group", e); - } - } - } } diff --git a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java index d901f77f3..8d46171b6 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java @@ -106,43 +106,41 @@ class InMemoryStorageIntegrationTest { } }, Clock.systemUTC()); - try (CallContext ignored = - CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext))) { - Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result = - storage.validateAccessToLocations( - new FileStorageConfigurationInfo(List.of("file://", "*")), - Set.of(PolarisStorageActions.READ), - Set.of( - "s3://bucket/path/to/warehouse/namespace/table", - "file:///etc/passwd", - "a/relative/subdirectory")); - Assertions.assertThat(result) - .hasSize(3) - .hasEntrySatisfying( - "s3://bucket/path/to/warehouse/namespace/table", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "file:///etc/passwd", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) - .hasEntrySatisfying( - "a/relative/subdirectory", - val -> - Assertions.assertThat(val) - .hasSize(1) - .containsKey(PolarisStorageActions.READ) - .extractingByKey(PolarisStorageActions.READ) - .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); - } + CallContext.setCurrentContext(CallContext.of(() -> "realm", polarisCallContext)); + Map<String, Map<PolarisStorageActions, PolarisStorageIntegration.ValidationResult>> result = + storage.validateAccessToLocations( + new FileStorageConfigurationInfo(List.of("file://", "*")), + Set.of(PolarisStorageActions.READ), + Set.of( + "s3://bucket/path/to/warehouse/namespace/table", + "file:///etc/passwd", + "a/relative/subdirectory")); + Assertions.assertThat(result) + .hasSize(3) + .hasEntrySatisfying( + "s3://bucket/path/to/warehouse/namespace/table", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "file:///etc/passwd", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)) + .hasEntrySatisfying( + "a/relative/subdirectory", + val -> + Assertions.assertThat(val) + .hasSize(1) + .containsKey(PolarisStorageActions.READ) + .extractingByKey(PolarisStorageActions.READ) + .returns(true, PolarisStorageIntegration.ValidationResult::isSuccess)); } @Test diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 0f834bc76..5abda4460 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -104,137 +104,126 @@ public abstract class BasePolarisMetaStoreManagerTest { @Test protected void testCreateEntities() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List<PolarisBaseEntity> createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - List<EntityNameLookupRecord> listedEntities = - metaStoreManager - .listEntities( - polarisTestMetaStoreManager.polarisCallContext, - null, - PolarisEntityType.TASK, - PolarisEntitySubType.NULL_SUBTYPE, - PageToken.readEverything()) - .getEntities(); - Assertions.assertThat(listedEntities) - .isNotNull() - .hasSize(2) - .containsExactly( - new EntityNameLookupRecord( - task1.getCatalogId(), - task1.getId(), - task1.getParentId(), - task1.getName(), - task1.getTypeCode(), - task1.getSubTypeCode()), - new EntityNameLookupRecord( - task2.getCatalogId(), - task2.getId(), - task2.getParentId(), - task2.getName(), - task2.getTypeCode(), - task2.getSubTypeCode())); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List<PolarisBaseEntity> createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + List<EntityNameLookupRecord> listedEntities = + metaStoreManager + .listEntities( + polarisTestMetaStoreManager.polarisCallContext, + null, + PolarisEntityType.TASK, + PolarisEntitySubType.NULL_SUBTYPE, + PageToken.readEverything()) + .getEntities(); + Assertions.assertThat(listedEntities) + .isNotNull() + .hasSize(2) + .containsExactly( + new EntityNameLookupRecord( + task1.getCatalogId(), + task1.getId(), + task1.getParentId(), + task1.getName(), + task1.getTypeCode(), + task1.getSubTypeCode()), + new EntityNameLookupRecord( + task2.getCatalogId(), + task2.getId(), + task2.getParentId(), + task2.getName(), + task2.getTypeCode(), + task2.getSubTypeCode())); } @Test protected void testCreateEntitiesAlreadyExisting() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - List<PolarisBaseEntity> createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(2) - .extracting(PolarisEntity::toCore) - .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); - - TaskEntity task3 = createTask("task3", 103L); - - // entities task1 and task2 already exist with the same identifier, so the full list is - // returned - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + List<PolarisBaseEntity> createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(2) + .extracting(PolarisEntity::toCore) + .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); + + TaskEntity task3 = createTask("task3", 103L); + + // entities task1 and task2 already exist with the same identifier, so the full list is + // returned + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); } @Test protected void testCreateEntitiesWithConflict() { PolarisMetaStoreManager metaStoreManager = polarisTestMetaStoreManager.polarisMetaStoreManager; - try (CallContext callCtx = - CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext)) { - if (CallContext.getCurrentContext() == null) { - CallContext.setCurrentContext(callCtx); - } - TaskEntity task1 = createTask("task1", 100L); - TaskEntity task2 = createTask("task2", 101L); - TaskEntity task3 = createTask("task3", 103L); - List<PolarisBaseEntity> createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, - null, - List.of(task1, task2, task3)) - .getEntities(); - - Assertions.assertThat(createdEntities) - .isNotNull() - .hasSize(3) - .extracting(PolarisEntity::toCore) - .containsExactly( - PolarisEntity.toCore(task1), - PolarisEntity.toCore(task2), - PolarisEntity.toCore(task3)); - - TaskEntity secondTask3 = createTask("task3", 104L); - - TaskEntity task4 = createTask("task4", 105L); - createdEntities = - metaStoreManager - .createEntitiesIfNotExist( - polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) - .getEntities(); - Assertions.assertThat(createdEntities).isNull(); + CallContext callCtx = + CallContext.of(() -> "testRealm", polarisTestMetaStoreManager.polarisCallContext); + if (CallContext.getCurrentContext() == null) { + CallContext.setCurrentContext(callCtx); } + TaskEntity task1 = createTask("task1", 100L); + TaskEntity task2 = createTask("task2", 101L); + TaskEntity task3 = createTask("task3", 103L); + List<PolarisBaseEntity> createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(task1, task2, task3)) + .getEntities(); + + Assertions.assertThat(createdEntities) + .isNotNull() + .hasSize(3) + .extracting(PolarisEntity::toCore) + .containsExactly( + PolarisEntity.toCore(task1), PolarisEntity.toCore(task2), PolarisEntity.toCore(task3)); + + TaskEntity secondTask3 = createTask("task3", 104L); + + TaskEntity task4 = createTask("task4", 105L); + createdEntities = + metaStoreManager + .createEntitiesIfNotExist( + polarisTestMetaStoreManager.polarisCallContext, null, List.of(secondTask3, task4)) + .getEntities(); + Assertions.assertThat(createdEntities).isNull(); } private static TaskEntity createTask(String taskName, long id) { diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 40b79980d..c4b3a3ae1 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -138,10 +138,6 @@ public class QuarkusProducers { return CallContext.of(realmContext, polarisCallContext); } - public void closeCallContext(@Disposes CallContext callContext) { - callContext.close(); - } - // Polaris service beans - selected from @Identifier-annotated beans @Produces diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java index fed5d20db..b7c3ceef4 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/auth/JWTSymmetricKeyGeneratorTest.java @@ -24,10 +24,7 @@ import com.auth0.jwt.JWT; import com.auth0.jwt.JWTVerifier; import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; -import java.util.Map; import org.apache.polaris.core.PolarisCallContext; -import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.PolarisBaseEntity; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; @@ -49,23 +46,6 @@ public class JWTSymmetricKeyGeneratorTest { @Test public void testJWTSymmetricKeyGenerator() { PolarisCallContext polarisCallContext = new PolarisCallContext(null, null, null, null); - CallContext.setCurrentContext( - new CallContext() { - @Override - public RealmContext getRealmContext() { - return () -> "realm"; - } - - @Override - public PolarisCallContext getPolarisCallContext() { - return polarisCallContext; - } - - @Override - public Map<String, Object> contextVariables() { - return Map.of(); - } - }); PolarisMetaStoreManager metastoreManager = Mockito.mock(PolarisMetaStoreManager.class); String mainSecret = "test_secret"; String clientId = "test_client_id"; diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java index 662f88bb0..925991784 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/BatchFileCleanupTaskHandlerTest.java @@ -94,114 +94,110 @@ public class BatchFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId1 = 100L; - ManifestFile manifestFile1 = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); - ManifestFile manifestFile2 = - TaskTestUtils.manifestFile( - fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); - Snapshot snapshot = - TaskTestUtils.newSnapshot( - fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); - StatisticsFile statisticsFile1 = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile1 = - TaskTestUtils.writePartitionStatsFile( - snapshot.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String firstMetadataFile = "v1-295495059.metadata.json"; - TableMetadata firstMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - firstMetadataFile, - List.of(statisticsFile1), - List.of(partitionStatisticsFile1), - snapshot); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + long snapshotId1 = 100L; + ManifestFile manifestFile1 = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId1, "dataFile1.parquet", "dataFile2.parquet"); + ManifestFile manifestFile2 = + TaskTestUtils.manifestFile( + fileIO, "manifest2.avro", snapshotId1, "dataFile3.parquet", "dataFile4.parquet"); + Snapshot snapshot = + TaskTestUtils.newSnapshot( + fileIO, "manifestList.avro", 1, snapshotId1, 99L, manifestFile1, manifestFile2); + StatisticsFile statisticsFile1 = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile1 = + TaskTestUtils.writePartitionStatsFile( + snapshot.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String firstMetadataFile = "v1-295495059.metadata.json"; + TableMetadata firstMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + firstMetadataFile, + List.of(statisticsFile1), + List.of(partitionStatisticsFile1), + snapshot); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - ManifestFile manifestFile3 = - TaskTestUtils.manifestFile( - fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); - Snapshot snapshot2 = - TaskTestUtils.newSnapshot( - fileIO, - "manifestList2.avro", - snapshot.sequenceNumber() + 1, - snapshot.snapshotId() + 1, - snapshot.snapshotId(), - manifestFile1, - manifestFile3); // exclude manifest2 from the new snapshot - StatisticsFile statisticsFile2 = - TaskTestUtils.writeStatsFile( - snapshot2.snapshotId(), - snapshot2.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - PartitionStatisticsFile partitionStatisticsFile2 = - TaskTestUtils.writePartitionStatsFile( - snapshot2.snapshotId(), - "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", - fileIO); - String secondMetadataFile = "v1-295495060.metadata.json"; - TableMetadata secondMetadata = - TaskTestUtils.writeTableMetadata( - fileIO, - secondMetadataFile, - firstMetadata, - firstMetadataFile, - List.of(statisticsFile2), - List.of(partitionStatisticsFile2), - snapshot2); - assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); - assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); + ManifestFile manifestFile3 = + TaskTestUtils.manifestFile( + fileIO, "manifest3.avro", snapshot.snapshotId() + 1, "dataFile5.parquet"); + Snapshot snapshot2 = + TaskTestUtils.newSnapshot( + fileIO, + "manifestList2.avro", + snapshot.sequenceNumber() + 1, + snapshot.snapshotId() + 1, + snapshot.snapshotId(), + manifestFile1, + manifestFile3); // exclude manifest2 from the new snapshot + StatisticsFile statisticsFile2 = + TaskTestUtils.writeStatsFile( + snapshot2.snapshotId(), + snapshot2.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + PartitionStatisticsFile partitionStatisticsFile2 = + TaskTestUtils.writePartitionStatsFile( + snapshot2.snapshotId(), + "/metadata/" + "partition-stats-" + UUID.randomUUID() + ".parquet", + fileIO); + String secondMetadataFile = "v1-295495060.metadata.json"; + TableMetadata secondMetadata = + TaskTestUtils.writeTableMetadata( + fileIO, + secondMetadataFile, + firstMetadata, + firstMetadataFile, + List.of(statisticsFile2), + List.of(partitionStatisticsFile2), + snapshot2); + assertThat(TaskUtils.exists(firstMetadataFile, fileIO)).isTrue(); + assertThat(TaskUtils.exists(secondMetadataFile, fileIO)).isTrue(); - List<String> cleanupFiles = - Stream.of( - secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), - secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), - secondMetadata.partitionStatisticsFiles().stream() - .map(PartitionStatisticsFile::path)) - .flatMap(s -> s) - .filter(file -> TaskUtils.exists(file, fileIO)) - .toList(); + List<String> cleanupFiles = + Stream.of( + secondMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file), + secondMetadata.statisticsFiles().stream().map(StatisticsFile::path), + secondMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path)) + .flatMap(s -> s) + .filter(file -> TaskUtils.exists(file, fileIO)) + .toList(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, cleanupFiles)) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask(tableIdentifier, cleanupFiles)) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); - for (String cleanupFile : cleanupFiles) { - assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); - } + for (String cleanupFile : cleanupFiles) { + assertThatPredicate((String file) -> TaskUtils.exists(file, fileIO)).rejects(cleanupFile); } } @@ -211,45 +207,43 @@ public class BatchFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - fileIO.deleteFile(statisticsFile.path()); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + fileIO.deleteFile(statisticsFile.path()); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -258,77 +252,73 @@ public class BatchFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map<String, AtomicInteger> retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map<String, AtomicInteger> retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("Simulating failure to test retries"); - } else { - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("Simulating failure to test retries"); + } else { + super.deleteFile(location); } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - BatchFileCleanupTaskHandler handler = - new BatchFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - long snapshotId = 100L; - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); - TestSnapshot snapshot = - TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); - String metadataFile = "v1-49494949.metadata.json"; - StatisticsFile statisticsFile = - TaskTestUtils.writeStatsFile( - snapshot.snapshotId(), - snapshot.sequenceNumber(), - "/metadata/" + UUID.randomUUID() + ".stats", - fileIO); - TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + BatchFileCleanupTaskHandler handler = + new BatchFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + long snapshotId = 100L; + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet", "dataFile2.parquet"); + TestSnapshot snapshot = + TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1, snapshotId, 99L, manifestFile); + String metadataFile = "v1-49494949.metadata.json"; + StatisticsFile statisticsFile = + TaskTestUtils.writeStatsFile( + snapshot.snapshotId(), + snapshot.sequenceNumber(), + "/metadata/" + UUID.randomUUID() + ".stats", + fileIO); + TaskTestUtils.writeTableMetadata(fileIO, metadataFile, List.of(statisticsFile), snapshot); + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isTrue(); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) - .withData( - new BatchFileCleanupTaskHandler.BatchFileCleanupTask( - tableIdentifier, List.of(statisticsFile.path()))) - .setName(UUID.randomUUID().toString()) - .build(); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.BATCH_FILE_CLEANUP) + .withData( + new BatchFileCleanupTaskHandler.BatchFileCleanupTask( + tableIdentifier, List.of(statisticsFile.path()))) + .setName(UUID.randomUUID().toString()) + .build(); - CompletableFuture<Void> future = - CompletableFuture.runAsync( - () -> { - CallContext.setCurrentContext(callCtx); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - handler.handleTask(task, callCtx); // this will schedule the batch deletion - }); + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> { + CallContext.setCurrentContext(callCtx); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + handler.handleTask(task, callCtx); // this will schedule the batch deletion + }); - // Wait for all async tasks to finish - future.join(); + // Wait for all async tasks to finish + future.join(); - // Check if the file was successfully deleted after retries - assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); + // Check if the file was successfully deleted after retries + assertThat(TaskUtils.exists(statisticsFile.path(), fileIO)).isFalse(); - // Ensure that retries happened as expected - assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); - assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); - } + // Ensure that retries happened as expected + assertThat(retryCounter.containsKey(statisticsFile.path())).isTrue(); + assertThat(retryCounter.get(statisticsFile.path()).get()).isEqualTo(3); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java index 39cd619bd..58fa14d7e 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/ManifestFileCleanupTaskHandlerTest.java @@ -93,32 +93,28 @@ class ManifestFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); - fileIO.deleteFile(manifestFile.path()); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 1L, "dataFile1.parquet", "dataFile2.parquet"); + fileIO.deleteFile(manifestFile.path()); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -127,30 +123,27 @@ class ManifestFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = new InMemoryFileIO(); - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - ManifestFile manifestFile = - TaskTestUtils.manifestFile( - fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = new InMemoryFileIO(); + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + ManifestFile manifestFile = + TaskTestUtils.manifestFile( + fileIO, "manifest1.avro", 100L, "dataFile1.parquet", "dataFile2.parquet"); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); } @Test @@ -159,47 +152,44 @@ class ManifestFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } - }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } + }; + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } @Test @@ -208,62 +198,57 @@ class ManifestFileCleanupTaskHandlerTest { new PolarisCallContext( metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(), new PolarisDefaultDiagServiceImpl()); - try (CallContext callCtx = CallContext.of(realmContext, polarisCallContext)) { - CallContext.setCurrentContext(callCtx); - Map<String, AtomicInteger> retryCounter = new HashMap<>(); - FileIO fileIO = - new InMemoryFileIO() { - @Override - public void close() { - // no-op - } + CallContext callCtx = CallContext.of(realmContext, polarisCallContext); + CallContext.setCurrentContext(callCtx); + Map<String, AtomicInteger> retryCounter = new HashMap<>(); + FileIO fileIO = + new InMemoryFileIO() { + @Override + public void close() { + // no-op + } - @Override - public void deleteFile(String location) { - int attempts = - retryCounter - .computeIfAbsent(location, k -> new AtomicInteger(0)) - .incrementAndGet(); - if (attempts < 3) { - throw new RuntimeException("I'm failing to test retries"); - } else { - // succeed on the third attempt - super.deleteFile(location); - } + @Override + public void deleteFile(String location) { + int attempts = + retryCounter.computeIfAbsent(location, k -> new AtomicInteger(0)).incrementAndGet(); + if (attempts < 3) { + throw new RuntimeException("I'm failing to test retries"); + } else { + // succeed on the third attempt + super.deleteFile(location); } - }; + } + }; - TableIdentifier tableIdentifier = - TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); - ManifestFileCleanupTaskHandler handler = - new ManifestFileCleanupTaskHandler( - buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); - String dataFile1Path = "dataFile1.parquet"; - OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); - PositionOutputStream out1 = dataFile1.createOrOverwrite(); - out1.write("the data".getBytes(UTF_8)); - out1.close(); - String dataFile2Path = "dataFile2.parquet"; - OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); - PositionOutputStream out2 = dataFile2.createOrOverwrite(); - out2.write("the data".getBytes(UTF_8)); - out2.close(); - ManifestFile manifestFile = - TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); - TaskEntity task = - new TaskEntity.Builder() - .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) - .withData( - new ManifestFileCleanupTaskHandler.ManifestCleanupTask( - tableIdentifier, - Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) - .setName(UUID.randomUUID().toString()) - .build(); - addTaskLocation(task); - assertThatPredicate(handler::canHandleTask).accepts(task); - assertThat(handler.handleTask(task, callCtx)).isTrue(); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); - assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); - } + TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1", "schema1"), "table1"); + ManifestFileCleanupTaskHandler handler = + new ManifestFileCleanupTaskHandler( + buildTaskFileIOSupplier(fileIO), Executors.newSingleThreadExecutor()); + String dataFile1Path = "dataFile1.parquet"; + OutputFile dataFile1 = fileIO.newOutputFile(dataFile1Path); + PositionOutputStream out1 = dataFile1.createOrOverwrite(); + out1.write("the data".getBytes(UTF_8)); + out1.close(); + String dataFile2Path = "dataFile2.parquet"; + OutputFile dataFile2 = fileIO.newOutputFile(dataFile2Path); + PositionOutputStream out2 = dataFile2.createOrOverwrite(); + out2.write("the data".getBytes(UTF_8)); + out2.close(); + ManifestFile manifestFile = + TaskTestUtils.manifestFile(fileIO, "manifest1.avro", 100L, dataFile1Path, dataFile2Path); + TaskEntity task = + new TaskEntity.Builder() + .withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP) + .withData( + new ManifestFileCleanupTaskHandler.ManifestCleanupTask( + tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile)))) + .setName(UUID.randomUUID().toString()) + .build(); + addTaskLocation(task); + assertThatPredicate(handler::canHandleTask).accepts(task); + assertThat(handler.handleTask(task, callCtx)).isTrue(); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile1Path); + assertThatPredicate((String f) -> TaskUtils.exists(f, fileIO)).rejects(dataFile2Path); } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java index 259cfc648..2dc0400bc 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/PolarisIntegrationTestFixture.java @@ -115,13 +115,12 @@ public class PolarisIntegrationTestFixture { PolarisCallContext polarisContext = new PolarisCallContext( metaStoreSession, helper.diagServices, helper.configurationStore, helper.clock); - try (CallContext ctx = CallContext.of(realmContext, polarisContext)) { - CallContext.setCurrentContext(ctx); + try { PolarisMetaStoreManager metaStoreManager = - helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); + helper.metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); EntityResult principal = metaStoreManager.readEntityByName( - ctx.getPolarisCallContext(), + polarisContext, null, PolarisEntityType.PRINCIPAL, PolarisEntitySubType.NULL_SUBTYPE, @@ -129,7 +128,7 @@ public class PolarisIntegrationTestFixture { Map<String, String> propertiesMap = readInternalProperties(principal); return metaStoreManager - .loadPrincipalSecrets(ctx.getPolarisCallContext(), propertiesMap.get("client_id")) + .loadPrincipalSecrets(polarisContext, propertiesMap.get("client_id")) .getPrincipalSecrets(); } finally { CallContext.unsetCurrentContext(); diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index 5f9cb1794..e99513155 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -255,7 +255,6 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog CatalogProperties.FILE_IO_IMPL); } - callContext.closeables().addCloseable(this); this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); diff --git a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java index 96445f949..5ee0cea23 100644 --- a/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/context/catalog/PolarisCallContextCatalogFactory.java @@ -99,8 +99,6 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto fileIOFactory, polarisEventListener); - context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance); - CatalogEntity catalog = CatalogEntity.of(baseCatalogEntity); Map<String, String> catalogProperties = new HashMap<>(catalog.getPropertiesAsMap()); String defaultBaseLocation = catalog.getDefaultBaseLocation(); diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 64a85df32..28a181ef8 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import jakarta.annotation.Nonnull; import java.lang.reflect.Method; import java.time.Clock; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iceberg.Schema; @@ -154,11 +153,6 @@ public class FileIOFactoryTest { testServices.configurationStore(), Mockito.mock(Clock.class)); } - - @Override - public Map<String, Object> contextVariables() { - return new HashMap<>(); - } }; } diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 4d9ec5654..72725c39e 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -27,7 +27,6 @@ import java.security.Principal; import java.time.Clock; import java.time.Instant; import java.util.Date; -import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -178,13 +177,7 @@ public record TestServices( configurationStore, Mockito.mock(Clock.class)); } - - @Override - public Map<String, Object> contextVariables() { - return new HashMap<>(); - } }; - CallContext.setCurrentContext(callContext); PolarisEntityManager entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext); PolarisMetaStoreManager metaStoreManager =