This is an automated email from the ASF dual-hosted git repository. emaynard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new e5e0c0d9b Event Listeners (#922) e5e0c0d9b is described below commit e5e0c0d9b271550949abc3bb1cd6e0733204b16c Author: Andrew Guterman <andrew.guterm...@gmail.com> AuthorDate: Wed May 7 10:28:29 2025 -0700 Event Listeners (#922) Implementation of event listeners discussed [here](https://lists.apache.org/thread/03yz5wolkvy8l7rbcwjnqdq1bl8p065v). I decided to keep this implementation generic and not take a dependency on Jakarta Events nor Vertx busses. It's easy to extend this, either within Polaris or in an external PolarisEventListener, and handle events however one wishes. Some high level notes: - PolarisEventListener is the main interface with all the event methods such as `onBeforeRequestRateLimited` - DefaultPolarisEventListener is an empty implementation which allows users to only partially implement event handlers - `polaris.events.type` is the config that lets you specify your event listener implementation --- .../src/main/resources/application.properties | 2 + .../quarkus/config/ProductionReadinessChecks.java | 12 +++ .../service/quarkus/config/QuarkusProducers.java | 9 ++ .../QuarkusPolarisEventListenerConfiguration.java | 32 +++++++ .../quarkus/task/QuarkusTaskExecutorImpl.java | 8 +- .../quarkus/admin/PolarisAuthzTestBase.java | 13 ++- .../quarkus/catalog/GenericTableCatalogTest.java | 4 +- .../catalog/IcebergCatalogHandlerAuthzTest.java | 3 +- .../quarkus/catalog/IcebergCatalogTest.java | 83 +++++++++++++----- .../quarkus/catalog/IcebergCatalogViewTest.java | 57 ++++++++++++- .../service/quarkus/catalog/PolicyCatalogTest.java | 4 +- .../quarkus/ratelimiter/RateLimiterFilterTest.java | 13 ++- .../polaris/service/quarkus/test/TestData.java | 36 ++++++++ .../service/catalog/iceberg/IcebergCatalog.java | 31 ++++++- .../context/PolarisCallContextCatalogFactory.java | 9 +- .../service/events/AfterTableCommitedEvent.java | 34 ++++++++ .../service/events/AfterTableRefreshedEvent.java | 28 ++++++ .../service/events/AfterTaskAttemptedEvent.java | 34 ++++++++ .../service/events/AfterViewCommitedEvent.java | 33 ++++++++ .../service/events/AfterViewRefreshedEvent.java | 28 ++++++ .../events/BeforeRequestRateLimitedEvent.java | 28 ++++++ .../service/events/BeforeTableCommitedEvent.java | 35 ++++++++ .../service/events/BeforeTableRefreshedEvent.java | 29 +++++++ .../service/events/BeforeTaskAttemptedEvent.java | 32 +++++++ .../service/events/BeforeViewCommitedEvent.java | 34 ++++++++ .../service/events/BeforeViewRefreshedEvent.java | 29 +++++++ .../service/events/NoOpPolarisEventListener.java | 27 ++++++ .../polaris/service/events/PolarisEvent.java | 25 ++++++ .../service/events/PolarisEventListener.java | 58 +++++++++++++ .../service/events/TestPolarisEventListener.java | 91 ++++++++++++++++++++ .../service/ratelimiter/RateLimiterFilter.java | 9 +- .../polaris/service/task/TaskExecutorImpl.java | 95 ++++++++++++--------- .../service/catalog/io/FileIOFactoryTest.java | 3 +- .../polaris/service/task/TaskExecutorImplTest.java | 99 ++++++++++++++++++++++ .../org/apache/polaris/service/TestServices.java | 12 ++- 35 files changed, 999 insertions(+), 80 deletions(-) diff --git a/quarkus/defaults/src/main/resources/application.properties b/quarkus/defaults/src/main/resources/application.properties index d9b82239e..0482f3de0 100644 --- a/quarkus/defaults/src/main/resources/application.properties +++ b/quarkus/defaults/src/main/resources/application.properties @@ -124,6 +124,8 @@ polaris.secrets-manager.type=in-memory polaris.file-io.type=default +polaris.event-listener.type=no-op + polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris # polaris.log.mdc.sid=polaris-service diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java index 5d9524f4c..2389537f1 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java @@ -36,6 +36,8 @@ import org.apache.polaris.service.auth.AuthenticationType; import org.apache.polaris.service.context.DefaultRealmContextResolver; import org.apache.polaris.service.context.RealmContextResolver; import org.apache.polaris.service.context.TestRealmContextResolver; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration; import org.eclipse.microprofile.config.Config; @@ -161,6 +163,16 @@ public class ProductionReadinessChecks { return ProductionReadinessCheck.OK; } + @Produces + public ProductionReadinessCheck checkPolarisEventListener( + PolarisEventListener polarisEventListener) { + if (polarisEventListener instanceof TestPolarisEventListener) { + return ProductionReadinessCheck.of( + Error.of("TestPolarisEventListener is intended for tests only.", "polaris.events.type")); + } + return ProductionReadinessCheck.OK; + } + private static String authRealmSegment(String realm) { return realm.equals(QuarkusAuthenticationConfiguration.DEFAULT_REALM_KEY) ? "" : realm + "."; } diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java index 122921e82..89bf2c6fb 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/config/QuarkusProducers.java @@ -62,11 +62,13 @@ import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.context.RealmContextConfiguration; import org.apache.polaris.service.context.RealmContextFilter; import org.apache.polaris.service.context.RealmContextResolver; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationRealmConfiguration; import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver; import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration; import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration; +import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration; import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration; import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration; import org.apache.polaris.service.quarkus.ratelimiter.QuarkusTokenBucketConfiguration; @@ -155,6 +157,13 @@ public class QuarkusProducers { return fileIOFactories.select(Identifier.Literal.of(config.type())).get(); } + @Produces + public PolarisEventListener polarisEventListener( + QuarkusPolarisEventListenerConfiguration config, + @Any Instance<PolarisEventListener> polarisEventListeners) { + return polarisEventListeners.select(Identifier.Literal.of(config.type())).get(); + } + @Produces public MetaStoreManagerFactory metaStoreManagerFactory( QuarkusPersistenceConfiguration config, diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java new file mode 100644 index 000000000..8921c726c --- /dev/null +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.events; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; + +@StaticInitSafe +@ConfigMapping(prefix = "polaris.event-listener") +public interface QuarkusPolarisEventListenerConfiguration { + /** + * The type of the event listener to use. Must be a registered {@link + * org.apache.polaris.service.events.PolarisEventListener} identifier. + */ + String type(); +} diff --git a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java index 8f38de648..3e16edb5a 100644 --- a/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java +++ b/quarkus/service/src/main/java/org/apache/polaris/service/quarkus/task/QuarkusTaskExecutorImpl.java @@ -29,6 +29,7 @@ import jakarta.inject.Inject; import java.util.concurrent.ExecutorService; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter; import org.apache.polaris.service.task.TaskExecutorImpl; import org.apache.polaris.service.task.TaskFileIOSupplier; @@ -39,7 +40,7 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl { private final Tracer tracer; public QuarkusTaskExecutorImpl() { - this(null, null, null, null); + this(null, null, null, null, null); } @Inject @@ -47,8 +48,9 @@ public class QuarkusTaskExecutorImpl extends TaskExecutorImpl { @Identifier("task-executor") ExecutorService executorService, MetaStoreManagerFactory metaStoreManagerFactory, TaskFileIOSupplier fileIOSupplier, - Tracer tracer) { - super(executorService, metaStoreManagerFactory, fileIOSupplier); + Tracer tracer, + PolarisEventListener polarisEventListener) { + super(executorService, metaStoreManagerFactory, fileIOSupplier, polarisEventListener); this.tracer = tracer; } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java index 6f3498d31..5761812f4 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java @@ -88,6 +88,7 @@ import org.apache.polaris.service.config.DefaultConfigurationStore; import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.context.CallContextCatalogFactory; import org.apache.polaris.service.context.PolarisCallContextCatalogFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.PolicyIdentifier; @@ -188,6 +189,7 @@ public abstract class PolarisAuthzTestBase { @Inject protected PolarisDiagnostics diagServices; @Inject protected Clock clock; @Inject protected FileIOFactory fileIOFactory; + @Inject protected PolarisEventListener polarisEventListener; protected IcebergCatalog baseCatalog; protected GenericTableCatalog genericTableCatalog; @@ -469,7 +471,8 @@ public abstract class PolarisAuthzTestBase { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); this.baseCatalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -485,7 +488,7 @@ public abstract class PolarisAuthzTestBase { extends PolarisCallContextCatalogFactory { public TestPolarisCallContextCatalogFactory() { - super(null, null, null, null, null); + super(null, null, null, null, null, null); } @Inject @@ -494,13 +497,15 @@ public abstract class PolarisAuthzTestBase { MetaStoreManagerFactory metaStoreManagerFactory, UserSecretsManagerFactory userSecretsManagerFactory, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { super( entityManagerFactory, metaStoreManagerFactory, userSecretsManagerFactory, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); } @Override diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java index 96b1010b4..0b6af6e8f 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/GenericTableCatalogTest.java @@ -81,6 +81,7 @@ import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.assertj.core.api.Assertions; @@ -264,7 +265,8 @@ public class GenericTableCatalogTest { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + new NoOpPolarisEventListener()); this.icebergCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java index 11a938f31..e74b7d641 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogHandlerAuthzTest.java @@ -1808,7 +1808,8 @@ public class IcebergCatalogHandlerAuthzTest extends PolarisAuthzTestBase { userSecretsManagerFactory, Mockito.mock(), new DefaultFileIOFactory( - realmEntityManagerFactory, managerFactory, new PolarisConfigurationStore() {})) { + realmEntityManagerFactory, managerFactory, new PolarisConfigurationStore() {}), + polarisEventListener) { @Override public Catalog createCallContextCatalog( CallContext context, diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java index 05ad0b54b..d81edd1d4 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogTest.java @@ -19,7 +19,6 @@ package org.apache.polaris.service.quarkus.catalog; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doReturn; @@ -116,8 +115,15 @@ import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; import org.apache.polaris.service.exception.IcebergExceptionMapper; +import org.apache.polaris.service.quarkus.test.TestData; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TableCleanupTaskHandler; import org.apache.polaris.service.task.TaskExecutor; @@ -164,17 +170,14 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { "polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"", "true", "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", - "[\"FILE\"]"); + "[\"FILE\"]", + "polaris.event-listener.type", + "test"); } } - protected static final Namespace NS = Namespace.of("newdb"); - protected static final TableIdentifier TABLE = TableIdentifier.of(NS, "table"); - protected static final Schema SCHEMA = - new Schema( - required(3, "id", Types.IntegerType.get(), "unique ID 🤪"), - required(4, "data", Types.StringType.get())); private static final String VIEW_QUERY = "select * from ns1.layer1_table"; + public static final String CATALOG_NAME = "polaris-catalog"; public static final String TEST_ACCESS_KEY = "test_access_key"; public static final String SECRET_ACCESS_KEY = "secret_access_key"; @@ -198,6 +201,7 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { @Inject PolarisStorageIntegrationProvider storageIntegrationProvider; @Inject UserSecretsManagerFactory userSecretsManagerFactory; @Inject PolarisDiagnostics diagServices; + @Inject PolarisEventListener polarisEventListener; private IcebergCatalog catalog; private CallContext callContext; @@ -210,6 +214,7 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { private FileIOFactory fileIOFactory; private PolarisEntity catalogEntity; private SecurityContext securityContext; + private TestPolarisEventListener testPolarisEventListener; @BeforeAll public static void setUpMocks() { @@ -319,6 +324,7 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { .thenReturn((PolarisStorageIntegration) storageIntegration); this.catalog = initCatalog("my-catalog", ImmutableMap.of()); + testPolarisEventListener = (TestPolarisEventListener) polarisEventListener; } @AfterEach @@ -354,7 +360,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder() .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") @@ -649,7 +656,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -984,7 +992,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( catalogWithoutStorage, ImmutableMap.of( @@ -1050,7 +1059,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( catalogName, ImmutableMap.of( @@ -1596,7 +1606,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); noPurgeCatalog.initialize( noPurgeCatalogName, ImmutableMap.of( @@ -1704,7 +1715,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(), - measured); + measured, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1714,8 +1726,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { .as("Nothing was created yet") .isEqualTo(0); - catalog.createNamespace(NS); - Table table = catalog.buildTable(TABLE, SCHEMA).create(); + catalog.createNamespace(TestData.NAMESPACE); + Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create(); // Asserting greaterThan 0 is sufficient for validating that the wrapper works without making // assumptions about the @@ -1727,7 +1739,9 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { .as("A table was read and written, but a trip to storage was made") .isEqualTo(0); - Assertions.assertThat(catalog.dropTable(TABLE)).as("Table deletion should succeed").isTrue(); + Assertions.assertThat(catalog.dropTable(TestData.TABLE)) + .as("Table deletion should succeed") + .isTrue(); TaskEntity taskEntity = TaskEntity.of( metaStoreManager @@ -1801,7 +1815,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1849,7 +1864,8 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(TaskExecutor.class), - fileIOFactory); + fileIOFactory, + polarisEventListener); catalog.initialize( CATALOG_NAME, ImmutableMap.of( @@ -1924,6 +1940,35 @@ public abstract class IcebergCatalogTest extends CatalogTests<IcebergCatalog> { } } + @Test + public void testEventsAreEmitted() { + IcebergCatalog catalog = catalog(); + catalog.createNamespace(TestData.NAMESPACE); + Table table = catalog.buildTable(TestData.TABLE, TestData.SCHEMA).create(); + + String key = "foo"; + String valOld = "bar1"; + String valNew = "bar2"; + table.updateProperties().set(key, valOld).commit(); + table.updateProperties().set(key, valNew).commit(); + + var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class); + Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + + var afterRefreshEvent = testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class); + Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + + var beforeTableEvent = testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class); + Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(beforeTableEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(beforeTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + + var afterTableEvent = testPolarisEventListener.getLatest(AfterTableCommitedEvent.class); + Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + } + private static InMemoryFileIO getInMemoryIo(IcebergCatalog catalog) { return (InMemoryFileIO) ((ExceptionMappingFileIO) catalog.getIo()).getInnerIo(); } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java index 25089d5c4..c66e88b37 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/IcebergCatalogViewTest.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewCatalogTests; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; @@ -67,12 +68,21 @@ import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.quarkus.test.TestData; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.assertj.core.configuration.PreferredAssumptionException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mockito; @@ -98,7 +108,9 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { "polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"", "true", "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"", - "[\"FILE\"]"); + "[\"FILE\"]", + "polaris.event-listener.type", + "test"); } } @@ -116,6 +128,7 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { @Inject UserSecretsManagerFactory userSecretsManagerFactory; @Inject PolarisConfigurationStore configurationStore; @Inject PolarisDiagnostics diagServices; + @Inject PolarisEventListener polarisEventListener; private IcebergCatalog catalog; @@ -124,6 +137,8 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { private UserSecretsManager userSecretsManager; private PolarisCallContext polarisContext; + private TestPolarisEventListener testPolarisEventListener; + @BeforeAll public static void setUpMocks() { PolarisStorageIntegrationProviderImpl mock = @@ -212,6 +227,8 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { FileIOFactory fileIOFactory = new DefaultFileIOFactory( new RealmEntityManagerFactory(managerFactory), managerFactory, configurationStore); + + testPolarisEventListener = (TestPolarisEventListener) polarisEventListener; this.catalog = new IcebergCatalog( entityManager, @@ -220,7 +237,8 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { passthroughView, securityContext, Mockito.mock(), - fileIOFactory); + fileIOFactory, + polarisEventListener); Map<String, String> properties = ImmutableMap.<String, String>builder() .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") @@ -249,4 +267,39 @@ public class IcebergCatalogViewTest extends ViewCatalogTests<IcebergCatalog> { protected boolean requiresNamespaceCreate() { return true; } + + @Test + public void testEventsAreEmitted() { + IcebergCatalog catalog = catalog(); + catalog.createNamespace(TestData.NAMESPACE); + View view = + catalog + .buildView(TestData.TABLE) + .withDefaultNamespace(TestData.NAMESPACE) + .withSchema(TestData.SCHEMA) + .withQuery("a", "b") + .create(); + + String key = "foo"; + String valOld = "bar1"; + String valNew = "bar2"; + view.updateProperties().set(key, valOld).commit(); + view.updateProperties().set(key, valNew).commit(); + + var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class); + Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); + + var afterRefreshEvent = testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class); + Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); + + var beforeCommitEvent = testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class); + Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + + var afterCommitEvent = testPolarisEventListener.getLatest(AfterViewCommitedEvent.class); + Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE); + Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + } } diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java index a5bf98ba9..68e2c35e0 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java @@ -94,6 +94,7 @@ import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.policy.PolicyCatalog; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.NoOpPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.ApplicablePolicy; @@ -287,7 +288,8 @@ public class PolicyCatalogTest { passthroughView, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + new NoOpPolarisEventListener()); this.icebergCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java index a5f66ec5b..48aa45001 100644 --- a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/ratelimiter/RateLimiterFilterTest.java @@ -31,6 +31,9 @@ import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.quarkus.ratelimiter.RateLimiterFilterTest.Profile; import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestFixture; import org.apache.polaris.service.quarkus.test.PolarisIntegrationTestHelper; @@ -81,7 +84,9 @@ public class RateLimiterFilterTest { "polaris.authentication.token-broker.type", "symmetric-key", "polaris.authentication.token-broker.symmetric-key.secret", - "secret"); + "secret", + "polaris.event-listener.type", + "test"); } } @@ -90,6 +95,7 @@ public class RateLimiterFilterTest { @Inject PolarisIntegrationTestHelper helper; @Inject MeterRegistry meterRegistry; + @Inject PolarisEventListener polarisEventListener; private TestEnvironment testEnv; private PolarisIntegrationTestFixture fixture; @@ -145,6 +151,11 @@ public class RateLimiterFilterTest { } requestAsserter.accept(Status.TOO_MANY_REQUESTS); + BeforeRequestRateLimitedEvent event = + ((TestPolarisEventListener) polarisEventListener) + .getLatest(BeforeRequestRateLimitedEvent.class); + assertThat(event.method()).isEqualTo("GET"); + // Examples of expected metrics: // http_server_requests_seconds_count{application="Polaris",environment="prod",method="GET",outcome="CLIENT_ERROR",realm_id="org_apache_polaris_service_ratelimiter_RateLimiterFilterTest",status="429",uri="/api/management/v1/principal-roles"} 1.0 // polaris_principal_roles_listPrincipalRoles_seconds_count{application="Polaris",class="org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi",environment="prod",exception="none",method="listPrincipalRoles"} 50.0 diff --git a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java new file mode 100644 index 000000000..8c061f201 --- /dev/null +++ b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/test/TestData.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.quarkus.test; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; + +/** Contains test data that can be reused across tests */ +public class TestData { + public static final Namespace NAMESPACE = Namespace.of("newdb"); + public static final TableIdentifier TABLE = TableIdentifier.of(NAMESPACE, "table"); + public static final Schema SCHEMA = + new Schema( + required(3, "id", Types.IntegerType.get(), "unique ID 🤪"), + required(4, "data", Types.StringType.get())); +} diff --git a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index afa7d1477..c66def885 100644 --- a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -123,6 +123,15 @@ import org.apache.polaris.core.storage.StorageLocation; import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.FileIOUtil; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; import org.apache.polaris.service.types.NotificationType; @@ -171,6 +180,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog private final CatalogEntity catalogEntity; private final TaskExecutor taskExecutor; private final SecurityContext securityContext; + private final PolarisEventListener polarisEventListener; + private String ioImplClassName; private FileIO catalogFileIO; private final String catalogName; @@ -197,7 +208,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog PolarisResolutionManifestCatalogView resolvedEntityView, SecurityContext securityContext, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { this.entityManager = entityManager; this.callContext = callContext; this.resolvedEntityView = resolvedEntityView; @@ -209,6 +221,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog this.catalogName = catalogEntity.getName(); this.fileIOFactory = fileIOFactory; this.metaStoreManager = metaStoreManager; + this.polarisEventListener = polarisEventListener; } @Override @@ -1319,6 +1332,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog if (latestLocation == null) { disableRefresh(); } else { + polarisEventListener.onBeforeTableRefreshed(new BeforeTableRefreshedEvent(tableIdentifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1338,10 +1352,14 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog Set.of(PolarisStorageActions.READ)); return TableMetadataParser.read(fileIO, metadataLocation); }); + polarisEventListener.onAfterTableRefreshed(new AfterTableRefreshedEvent(tableIdentifier)); } } public void doCommit(TableMetadata base, TableMetadata metadata) { + polarisEventListener.onBeforeTableCommited( + new BeforeTableCommitedEvent(tableIdentifier, base, metadata)); + LOGGER.debug( "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict @@ -1494,6 +1512,9 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog } else { updateTableLike(tableIdentifier, entity); } + + polarisEventListener.onAfterTableCommited( + new AfterTableCommitedEvent(tableIdentifier, base, metadata)); } @Override @@ -1684,6 +1705,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog if (latestLocation == null) { disableRefresh(); } else { + polarisEventListener.onBeforeViewRefreshed(new BeforeViewRefreshedEvent(identifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1705,10 +1727,14 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation)); }); + polarisEventListener.onAfterViewRefreshed(new AfterViewRefreshedEvent(identifier)); } } public void doCommit(ViewMetadata base, ViewMetadata metadata) { + polarisEventListener.onBeforeViewCommited( + new BeforeViewCommitedEvent(identifier, base, metadata)); + // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict LOGGER.debug("doCommit for view {} with base {}, metadata {}", identifier, base, metadata); if (null == base && !namespaceExists(identifier.namespace())) { @@ -1802,6 +1828,9 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog } else { updateTableLike(identifier, entity); } + + polarisEventListener.onAfterViewCommited( + new AfterViewCommitedEvent(identifier, base, metadata)); } protected String writeNewMetadataIfRequired(ViewMetadata metadata) { diff --git a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java index 94fe19760..b1d28ce89 100644 --- a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java +++ b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java @@ -38,6 +38,7 @@ import org.apache.polaris.core.secrets.UserSecretsManagerFactory; import org.apache.polaris.service.catalog.iceberg.IcebergCatalog; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.RealmEntityManagerFactory; +import org.apache.polaris.service.events.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,7 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto private final FileIOFactory fileIOFactory; private final MetaStoreManagerFactory metaStoreManagerFactory; private final UserSecretsManagerFactory userSecretsManagerFactory; + private final PolarisEventListener polarisEventListener; @Inject public PolarisCallContextCatalogFactory( @@ -62,12 +64,14 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto MetaStoreManagerFactory metaStoreManagerFactory, UserSecretsManagerFactory userSecretsManagerFactory, TaskExecutor taskExecutor, - FileIOFactory fileIOFactory) { + FileIOFactory fileIOFactory, + PolarisEventListener polarisEventListener) { this.entityManagerFactory = entityManagerFactory; this.metaStoreManagerFactory = metaStoreManagerFactory; this.userSecretsManagerFactory = userSecretsManagerFactory; this.taskExecutor = taskExecutor; this.fileIOFactory = fileIOFactory; + this.polarisEventListener = polarisEventListener; } @Override @@ -95,7 +99,8 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto resolvedManifest, securityContext, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance); diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java new file mode 100644 index 000000000..c952997df --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception + * while committing. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record AfterTableCommitedEvent( + TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java new file mode 100644 index 000000000..be38a8baa --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris refreshes its known version of a table's metadata by fetching the latest. + * + * @param tableIdentifier The identifier of the table that was refreshed. + */ +public record AfterTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java new file mode 100644 index 000000000..638ba84fb --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted after an attempt of an async task, such as manifest file cleanup, finishes. + * + * @param taskEntityId The ID of the TaskEntity. + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + * @param success Whether or not the attempt succeeded. + */ +public record AfterTaskAttemptedEvent( + long taskEntityId, CallContext callContext, int attempt, boolean success) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java new file mode 100644 index 000000000..eb2ca2414 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.view.ViewMetadata; + +/** + * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception + * while committing. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record AfterViewCommitedEvent( + TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java new file mode 100644 index 000000000..249220ddd --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted after Polaris refreshes its known version of a view's metadata by fetching the latest. + * + * @param viewIdentifier The identifier of the view that was refreshed. + */ +public record AfterViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java new file mode 100644 index 000000000..1d9780ebe --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Emitted before the RateLimiterFilter rejects a request due to exceeding the rate limit. + * + * @param method The request's HTTP method + * @param absolutePath The request's absolute path + */ +public record BeforeRequestRateLimitedEvent(String method, String absolutePath) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java new file mode 100644 index 000000000..2bcc49ab6 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to perform a commit to a table. There is no guarantee on the order + * of this event relative to the validation checks we've performed, which means the commit may still + * fail Polaris-side validation checks. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record BeforeTableCommitedEvent( + TableIdentifier identifier, TableMetadata base, TableMetadata metadata) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java new file mode 100644 index 000000000..f319298f5 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to refresh its known version of a table's metadata by fetching the + * latest. + * + * @param tableIdentifier The identifier of the table being refreshed. + */ +public record BeforeTableRefreshedEvent(TableIdentifier tableIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java new file mode 100644 index 000000000..a7fa7231e --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.polaris.core.context.CallContext; + +/** + * Emitted before an attempt of an async task, such as manifest file cleanup, begins. + * + * @param taskEntityId The ID of the TaskEntity + * @param callContext The CallContext the task is being executed under. + * @param attempt The attempt number. Each retry of the task will have its own attempt number. The + * initial (non-retried) attempt starts counting from 1. + */ +public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt) + implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java new file mode 100644 index 000000000..16e460d80 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.view.ViewMetadata; + +/** + * Emitted when Polaris intends to perform a commit to a view. There is no guarantee on the order of + * this event relative to the validation checks we've performed, which means the commit may still + * fail Polaris-side validation checks. + * + * @param identifier The identifier. + * @param base The old metadata. + * @param metadata The new metadata. + */ +public record BeforeViewCommitedEvent( + TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java new file mode 100644 index 000000000..6f58d2ca2 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * Emitted when Polaris intends to refresh its known version of a view's metadata by fetching the + * latest. + * + * @param viewIdentifier The identifier of the view being refreshed. + */ +public record BeforeViewRefreshedEvent(TableIdentifier viewIdentifier) implements PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java new file mode 100644 index 000000000..f31fbcef5 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/NoOpPolarisEventListener.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; + +/** Event listener that does nothing. */ +@ApplicationScoped +@Identifier("no-op") +public class NoOpPolarisEventListener extends PolarisEventListener {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java new file mode 100644 index 000000000..4922c02f4 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEvent.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Represents an event emitted by Polaris. Currently there's no common data across events so this is + * just a marker interface. * + */ +public interface PolarisEvent {} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java new file mode 100644 index 000000000..485766bb2 --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +/** + * Represents an event listener that can respond to notable moments during Polaris's execution. + * Event details are documented under the event objects themselves. + */ +public abstract class PolarisEventListener { + /** {@link BeforeRequestRateLimitedEvent} */ + public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + + /** {@link BeforeTableCommitedEvent} */ + public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + + /** {@link AfterTableCommitedEvent} */ + public void onAfterTableCommited(AfterTableCommitedEvent event) {} + + /** {@link BeforeViewCommitedEvent} */ + public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + + /** {@link AfterViewCommitedEvent} */ + public void onAfterViewCommited(AfterViewCommitedEvent event) {} + + /** {@link BeforeTableRefreshedEvent} */ + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + + /** {@link AfterTableRefreshedEvent} */ + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + + /** {@link BeforeViewRefreshedEvent} */ + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + + /** {@link AfterViewRefreshedEvent} */ + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + + /** {@link BeforeTaskAttemptedEvent} */ + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + + /** {@link AfterTaskAttemptedEvent} */ + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} +} diff --git a/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java new file mode 100644 index 000000000..668edc7fa --- /dev/null +++ b/service/common/src/main/java/org/apache/polaris/service/events/TestPolarisEventListener.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import com.google.common.collect.Streams; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import java.util.ArrayList; +import java.util.List; + +/** Event listener that stores all emitted events forever. Not recommended for use in production. */ +@ApplicationScoped +@Identifier("test") +public class TestPolarisEventListener extends PolarisEventListener { + private final List<PolarisEvent> history = new ArrayList<>(); + + public <T> T getLatest(Class<T> type) { + return (T) Streams.findLast(history.stream().filter(type::isInstance)).orElseThrow(); + } + + @Override + public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTableCommited(BeforeTableCommitedEvent event) { + history.add(event); + } + + @Override + public void onAfterTableCommited(AfterTableCommitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeViewCommited(BeforeViewCommitedEvent event) { + history.add(event); + } + + @Override + public void onAfterViewCommited(AfterViewCommitedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) { + history.add(event); + } + + @Override + public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { + history.add(event); + } + + @Override + public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) { + history.add(event); + } + + @Override + public void onAfterViewRefreshed(AfterViewRefreshedEvent event) { + history.add(event); + } + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) { + history.add(event); + } + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) { + history.add(event); + } +} diff --git a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index 28bb60589..7b0fda93f 100644 --- a/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/service/common/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -28,6 +28,8 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.ext.Provider; import java.io.IOException; import org.apache.polaris.service.config.PolarisFilterPriorities; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,16 +42,21 @@ public class RateLimiterFilter implements ContainerRequestFilter { private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterFilter.class); private final RateLimiter rateLimiter; + private final PolarisEventListener polarisEventListener; @Inject - public RateLimiterFilter(RateLimiter rateLimiter) { + public RateLimiterFilter(RateLimiter rateLimiter, PolarisEventListener polarisEventListener) { this.rateLimiter = rateLimiter; + this.polarisEventListener = polarisEventListener; } /** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */ @Override public void filter(ContainerRequestContext ctx) throws IOException { if (!rateLimiter.canProceed()) { + polarisEventListener.onBeforeRequestRateLimited( + new BeforeRequestRateLimitedEvent( + ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString())); ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); LOGGER.atDebug().log("Rate limiting request"); } diff --git a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index 6b42c3fc8..140931031 100644 --- a/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/service/common/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -33,6 +33,9 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +51,17 @@ public class TaskExecutorImpl implements TaskExecutor { private final MetaStoreManagerFactory metaStoreManagerFactory; private final TaskFileIOSupplier fileIOSupplier; private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>(); + private final PolarisEventListener polarisEventListener; public TaskExecutorImpl( Executor executor, MetaStoreManagerFactory metaStoreManagerFactory, - TaskFileIOSupplier fileIOSupplier) { + TaskFileIOSupplier fileIOSupplier, + PolarisEventListener polarisEventListener) { this.executor = executor; this.metaStoreManagerFactory = metaStoreManagerFactory; this.fileIOSupplier = fileIOSupplier; + this.polarisEventListener = polarisEventListener; } public void init() { @@ -111,45 +117,54 @@ public class TaskExecutorImpl implements TaskExecutor { } protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { - // set the call context INSIDE the async task - CallContext.setCurrentContext(ctx); - LOGGER.info("Handling task entity id {}", taskEntityId); - PolarisMetaStoreManager metaStoreManager = - metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); - PolarisBaseEntity taskEntity = - metaStoreManager - .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, PolarisEntityType.TASK) - .getEntity(); - if (!PolarisEntityType.TASK.equals(taskEntity.getType())) { - throw new IllegalArgumentException("Provided taskId must be a task entity type"); - } - TaskEntity task = TaskEntity.of(taskEntity); - Optional<TaskHandler> handlerOpt = - taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst(); - if (handlerOpt.isEmpty()) { - LOGGER - .atWarn() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("taskType", task.getTaskType()) - .log("Unable to find handler for task type"); - return; - } - TaskHandler handler = handlerOpt.get(); - boolean success = handler.handleTask(task, ctx); - if (success) { - LOGGER - .atInfo() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("handlerClass", handler.getClass()) - .log("Task successfully handled"); - metaStoreManager.dropEntityIfExists( - ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false); - } else { - LOGGER - .atWarn() - .addKeyValue("taskEntityId", taskEntityId) - .addKeyValue("taskEntityName", taskEntity.getName()) - .log("Unable to execute async task"); + polarisEventListener.onBeforeTaskAttempted( + new BeforeTaskAttemptedEvent(taskEntityId, ctx, attempt)); + + boolean success = false; + try { + // set the call context INSIDE the async task + CallContext.setCurrentContext(ctx); + LOGGER.info("Handling task entity id {}", taskEntityId); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(ctx.getRealmContext()); + PolarisBaseEntity taskEntity = + metaStoreManager + .loadEntity(ctx.getPolarisCallContext(), 0L, taskEntityId, PolarisEntityType.TASK) + .getEntity(); + if (!PolarisEntityType.TASK.equals(taskEntity.getType())) { + throw new IllegalArgumentException("Provided taskId must be a task entity type"); + } + TaskEntity task = TaskEntity.of(taskEntity); + Optional<TaskHandler> handlerOpt = + taskHandlers.stream().filter(th -> th.canHandleTask(task)).findFirst(); + if (handlerOpt.isEmpty()) { + LOGGER + .atWarn() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("taskType", task.getTaskType()) + .log("Unable to find handler for task type"); + return; + } + TaskHandler handler = handlerOpt.get(); + success = handler.handleTask(task, ctx); + if (success) { + LOGGER + .atInfo() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("handlerClass", handler.getClass()) + .log("Task successfully handled"); + metaStoreManager.dropEntityIfExists( + ctx.getPolarisCallContext(), null, taskEntity, Map.of(), false); + } else { + LOGGER + .atWarn() + .addKeyValue("taskEntityId", taskEntityId) + .addKeyValue("taskEntityName", taskEntity.getName()) + .log("Unable to execute async task"); + } + } finally { + polarisEventListener.onAfterTaskAttempted( + new AfterTaskAttemptedEvent(taskEntityId, ctx, attempt, success)); } } } diff --git a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java index 9a070273b..a6854703d 100644 --- a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java +++ b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java @@ -244,7 +244,8 @@ public class FileIOFactoryTest { passthroughView, services.securityContext(), services.taskExecutor(), - services.fileIOFactory()); + services.fileIOFactory(), + services.polarisEventListener()); polarisCatalog.initialize( CATALOG_NAME, ImmutableMap.of( diff --git a/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java new file mode 100644 index 000000000..1cc88fc10 --- /dev/null +++ b/service/common/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.task; + +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.TaskEntity; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.apache.polaris.service.TestServices; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.TestPolarisEventListener; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Unit tests for TaskExecutorImpl */ +public class TaskExecutorImplTest { + @Test + void testEventsAreEmitted() { + String realm = "myrealm"; + RealmContext realmContext = () -> realm; + + TestServices testServices = TestServices.builder().realmContext(realmContext).build(); + + TestPolarisEventListener testPolarisEventListener = + (TestPolarisEventListener) testServices.polarisEventListener(); + + MetaStoreManagerFactory metaStoreManagerFactory = testServices.metaStoreManagerFactory(); + PolarisMetaStoreManager metaStoreManager = + metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); + BasePersistence bp = metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(); + + PolarisCallContext polarisCallCtx = + new PolarisCallContext(bp, testServices.polarisDiagnostics()); + CallContext callContext = CallContext.of(realmContext, polarisCallCtx); + + // This task doesn't have a type so it won't be handle-able by a real handler. We register a + // test TaskHandler below that can handle any task. + TaskEntity taskEntity = + new TaskEntity.Builder() + .setName("mytask") + .setId(metaStoreManager.generateNewEntityId(polarisCallCtx).getId()) + .build(); + metaStoreManager.createEntityIfNotExists(polarisCallCtx, null, taskEntity); + + int attempt = 1; + + TaskExecutorImpl executor = + new TaskExecutorImpl( + Runnable::run, + testServices.metaStoreManagerFactory(), + new TaskFileIOSupplier(testServices.fileIOFactory()), + testServices.polarisEventListener()); + executor.addTaskHandler( + new TaskHandler() { + @Override + public boolean canHandleTask(TaskEntity task) { + return true; + } + + @Override + public boolean handleTask(TaskEntity task, CallContext callContext) { + var beforeTaskAttemptedEvent = + testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class); + Assertions.assertEquals(taskEntity.getId(), beforeTaskAttemptedEvent.taskEntityId()); + Assertions.assertEquals(callContext, beforeTaskAttemptedEvent.callContext()); + Assertions.assertEquals(attempt, beforeTaskAttemptedEvent.attempt()); + return true; + } + }); + + executor.handleTask(taskEntity.getId(), callContext, attempt); + + var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class); + Assertions.assertEquals(taskEntity.getId(), afterAttemptTaskEvent.taskEntityId()); + Assertions.assertEquals(callContext, afterAttemptTaskEvent.callContext()); + Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt()); + Assertions.assertTrue(afterAttemptTaskEvent.success()); + } +} diff --git a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java index 88406b895..cb48ad0a1 100644 --- a/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java +++ b/service/common/src/testFixtures/java/org/apache/polaris/service/TestServices.java @@ -56,6 +56,8 @@ import org.apache.polaris.service.config.DefaultConfigurationStore; import org.apache.polaris.service.config.RealmEntityManagerFactory; import org.apache.polaris.service.context.CallContextCatalogFactory; import org.apache.polaris.service.context.PolarisCallContextCatalogFactory; +import org.apache.polaris.service.events.PolarisEventListener; +import org.apache.polaris.service.events.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.secrets.UnsafeInMemorySecretsManagerFactory; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -75,7 +77,8 @@ public record TestServices( RealmContext realmContext, SecurityContext securityContext, FileIOFactory fileIOFactory, - TaskExecutor taskExecutor) { + TaskExecutor taskExecutor, + PolarisEventListener polarisEventListener) { private static final RealmContext TEST_REALM = () -> "test-realm"; private static final String GCP_ACCESS_TOKEN = "abc"; @@ -175,13 +178,15 @@ public record TestServices( TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class); + PolarisEventListener polarisEventListener = new TestPolarisEventListener(); CallContextCatalogFactory callContextFactory = new PolarisCallContextCatalogFactory( realmEntityManagerFactory, metaStoreManagerFactory, userSecretsManagerFactory, taskExecutor, - fileIOFactory); + fileIOFactory, + polarisEventListener); IcebergCatalogAdapter service = new IcebergCatalogAdapter( @@ -252,7 +257,8 @@ public record TestServices( realmContext, securityContext, fileIOFactory, - taskExecutor); + taskExecutor, + polarisEventListener); } } }