This is an automated email from the ASF dual-hosted git repository. singhpk234 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new d03c7174c Add Events for Iceberg REST APIs (#2480) d03c7174c is described below commit d03c7174ccf6771a0673e02b5e6d691d6cca9419 Author: Adnan Hemani <adna...@berkeley.edu> AuthorDate: Wed Sep 17 14:44:19 2025 -0700 Add Events for Iceberg REST APIs (#2480) --- .../polaris/service/admin/PolarisServiceImpl.java | 2 - .../service/catalog/iceberg/IcebergCatalog.java | 45 ++- .../catalog/iceberg/IcebergCatalogHandler.java | 8 +- .../IcebergRestCatalogEventServiceDelegator.java | 336 ++++++++++++++++++--- ...bergRestConfigurationEventServiceDelegator.java | 11 +- ...emptedEvent.java => AfterAttemptTaskEvent.java} | 2 +- .../service/events/AfterCatalogCreatedEvent.java | 23 -- .../service/events/AfterTableCommitedEvent.java | 35 --- .../service/events/AfterTableCreatedEvent.java | 28 -- .../service/events/AfterTableRefreshedEvent.java | 30 -- .../service/events/AfterViewCommitedEvent.java | 35 --- .../service/events/AfterViewRefreshedEvent.java | 30 -- ...mptedEvent.java => BeforeAttemptTaskEvent.java} | 2 +- ...Event.java => BeforeLimitRequestRateEvent.java} | 2 +- .../service/events/BeforeTableCommitedEvent.java | 35 --- .../service/events/BeforeTableRefreshedEvent.java | 31 -- .../service/events/BeforeViewCommitedEvent.java | 36 --- .../service/events/BeforeViewRefreshedEvent.java | 31 -- .../service/events/IcebergRestCatalogEvents.java | 251 +++++++++++++++ .../PropertyMapEventListener.java | 4 +- .../events/listeners/PolarisEventListener.java | 229 +++++++++++--- .../listeners/PolarisPersistenceEventListener.java | 68 ++--- .../events/listeners/TestPolarisEventListener.java | 37 +-- .../service/ratelimiter/RateLimiterFilter.java | 6 +- .../polaris/service/task/TaskExecutorImpl.java | 10 +- .../iceberg/AbstractIcebergCatalogTest.java | 23 +- .../iceberg/AbstractIcebergCatalogViewTest.java | 28 +- .../cloudwatch/AwsCloudWatchEventListenerTest.java | 17 +- .../service/ratelimiter/RateLimiterFilterTest.java | 6 +- .../polaris/service/task/TaskExecutorImplTest.java | 9 +- 30 files changed, 861 insertions(+), 549 deletions(-) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java index e8a1c5d99..48ad7027e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java @@ -76,7 +76,6 @@ import org.apache.polaris.service.admin.api.PolarisCatalogsApiService; import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApiService; import org.apache.polaris.service.admin.api.PolarisPrincipalsApiService; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.AfterCatalogCreatedEvent; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.types.PolicyIdentifier; import org.slf4j.Logger; @@ -131,7 +130,6 @@ public class PolarisServiceImpl validateExternalCatalog(catalog); Catalog newCatalog = CatalogEntity.of(adminService.createCatalog(request)).asCatalog(); LOGGER.info("Created new catalog {}", newCatalog); - polarisEventListener.onAfterCatalogCreated(new AfterCatalogCreatedEvent(newCatalog.getName())); return Response.status(Response.Status.CREATED).build(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java index d7dc69b26..101fe3330 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java @@ -132,14 +132,7 @@ import org.apache.polaris.service.catalog.common.LocationUtils; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.FileIOUtil; import org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.task.TaskExecutor; import org.apache.polaris.service.types.NotificationRequest; @@ -1446,8 +1439,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeTableRefreshed( - new BeforeTableRefreshedEvent(catalogName, tableIdentifier)); + polarisEventListener.onBeforeRefreshTable( + new IcebergRestCatalogEvents.BeforeRefreshTableEvent(catalogName, tableIdentifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1467,14 +1460,15 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST)); return TableMetadataParser.read(fileIO, metadataLocation); }); - polarisEventListener.onAfterTableRefreshed( - new AfterTableRefreshedEvent(catalogName, tableIdentifier)); + polarisEventListener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent(catalogName, tableIdentifier)); } } public void doCommit(TableMetadata base, TableMetadata metadata) { - polarisEventListener.onBeforeTableCommited( - new BeforeTableCommitedEvent(tableIdentifier, base, metadata)); + polarisEventListener.onBeforeCommitTable( + new IcebergRestCatalogEvents.BeforeCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); LOGGER.debug( "doCommit for table {} with metadataBefore {}, metadataAfter {}", @@ -1618,8 +1612,9 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog updateTableLike(tableIdentifier, entity); } - polarisEventListener.onAfterTableCommited( - new AfterTableCommitedEvent(catalogName, tableIdentifier, base, metadata)); + polarisEventListener.onAfterCommitTable( + new IcebergRestCatalogEvents.AfterCommitTableEvent( + catalogName, tableIdentifier, base, metadata)); } @Override @@ -1810,8 +1805,8 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog if (latestLocation == null) { disableRefresh(); } else { - polarisEventListener.onBeforeViewRefreshed( - new BeforeViewRefreshedEvent(catalogName, identifier)); + polarisEventListener.onBeforeRefreshView( + new IcebergRestCatalogEvents.BeforeRefreshViewEvent(catalogName, identifier)); refreshFromMetadataLocation( latestLocation, SHOULD_RETRY_REFRESH_PREDICATE, @@ -1833,14 +1828,15 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation)); }); - polarisEventListener.onAfterViewRefreshed( - new AfterViewRefreshedEvent(catalogName, identifier)); + polarisEventListener.onAfterRefreshView( + new IcebergRestCatalogEvents.AfterRefreshViewEvent(catalogName, identifier)); } } public void doCommit(ViewMetadata base, ViewMetadata metadata) { - polarisEventListener.onBeforeViewCommited( - new BeforeViewCommitedEvent(catalogName, identifier, base, metadata)); + polarisEventListener.onBeforeCommitView( + new IcebergRestCatalogEvents.BeforeCommitViewEvent( + catalogName, identifier, base, metadata)); // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict LOGGER.debug( @@ -1940,8 +1936,9 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog updateTableLike(identifier, entity); } - polarisEventListener.onAfterViewCommited( - new AfterViewCommitedEvent(catalogName, identifier, base, metadata)); + polarisEventListener.onAfterCommitView( + new IcebergRestCatalogEvents.AfterCommitViewEvent( + catalogName, identifier, base, metadata)); } protected String writeNewMetadataIfRequired(ViewMetadata metadata) { diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 863e1aef5..b9c99ac3a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -102,7 +102,6 @@ import org.apache.polaris.service.catalog.SupportsNotifications; import org.apache.polaris.service.catalog.common.CatalogHandler; import org.apache.polaris.service.config.ReservedProperties; import org.apache.polaris.service.context.catalog.CallContextCatalogFactory; -import org.apache.polaris.service.events.AfterTableCreatedEvent; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.http.IcebergHttpUtil; import org.apache.polaris.service.http.IfNoneMatch; @@ -392,11 +391,8 @@ public class IcebergCatalogHandler extends CatalogHandler implements AutoCloseab .withWriteOrder(request.writeOrder()) .setProperties(reservedProperties.removeReservedProperties(request.properties())) .build(); - LoadTableResponse resp = - catalogHandlerUtils.createTable(baseCatalog, namespace, requestWithoutReservedProperties); - polarisEventListener.onAfterTableCreated( - new AfterTableCreatedEvent(catalogName, identifier, resp.tableMetadata())); - return resp; + return catalogHandlerUtils.createTable( + baseCatalog, namespace, requestWithoutReservedProperties); } /** diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java index 43d1c1f00..edf15d484 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java @@ -25,6 +25,7 @@ import jakarta.decorator.Delegate; import jakarta.inject.Inject; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -33,17 +34,75 @@ import org.apache.iceberg.rest.requests.RegisterTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.CreateNamespaceResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.service.catalog.CatalogPrefixParser; import org.apache.polaris.service.catalog.api.IcebergRestCatalogApiService; +import org.apache.polaris.service.catalog.common.CatalogAdapter; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCheckExistsViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterCreateViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterDropViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListNamespacesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListTablesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterListViewsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadCredentialsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterLoadViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRegisterTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterRenameViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterReplaceViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterSendNotificationEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.AfterUpdateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCheckExistsViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeCreateViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropNamespaceEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeDropViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListNamespacesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListTablesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeListViewsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadCredentialsEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeLoadViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRegisterTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameTableEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeRenameViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeReplaceViewEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeSendNotificationEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents.BeforeUpdateTableEvent; +import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.types.CommitTableRequest; import org.apache.polaris.service.types.CommitViewRequest; import org.apache.polaris.service.types.NotificationRequest; @Decorator @Priority(1000) -public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatalogApiService { +public class IcebergRestCatalogEventServiceDelegator + implements IcebergRestCatalogApiService, CatalogAdapter { @Inject @Delegate IcebergCatalogAdapter delegate; + @Inject PolarisEventListener polarisEventListener; + @Inject CatalogPrefixParser prefixParser; @Override public Response createNamespace( @@ -51,7 +110,18 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal CreateNamespaceRequest createNamespaceRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeCreateNamespace( + new BeforeCreateNamespaceEvent(catalogName, createNamespaceRequest)); + Response resp = + delegate.createNamespace(prefix, createNamespaceRequest, realmContext, securityContext); + CreateNamespaceResponse createNamespaceResponse = (CreateNamespaceResponse) resp.getEntity(); + polarisEventListener.onAfterCreateNamespace( + new AfterCreateNamespaceEvent( + catalogName, + createNamespaceResponse.namespace(), + createNamespaceResponse.properties())); + return resp; } @Override @@ -62,26 +132,51 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String parent, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listNamespaces( - prefix, pageToken, pageSize, parent, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeListNamespaces(new BeforeListNamespacesEvent(catalogName, parent)); + Response resp = + delegate.listNamespaces(prefix, pageToken, pageSize, parent, realmContext, securityContext); + polarisEventListener.onAfterListNamespaces(new AfterListNamespacesEvent(catalogName, parent)); + return resp; } @Override public Response loadNamespaceMetadata( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeLoadNamespaceMetadata( + new BeforeLoadNamespaceMetadataEvent(catalogName, decodeNamespace(namespace))); + Response resp = + delegate.loadNamespaceMetadata(prefix, namespace, realmContext, securityContext); + GetNamespaceResponse getNamespaceResponse = (GetNamespaceResponse) resp.getEntity(); + polarisEventListener.onAfterLoadNamespaceMetadata( + new AfterLoadNamespaceMetadataEvent( + catalogName, getNamespaceResponse.namespace(), getNamespaceResponse.properties())); + return resp; } @Override public Response namespaceExists( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.namespaceExists(prefix, namespace, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeCheckExistsNamespace( + new BeforeCheckExistsNamespaceEvent(catalogName, namespaceObj)); + Response resp = delegate.namespaceExists(prefix, namespace, realmContext, securityContext); + polarisEventListener.onAfterCheckExistsNamespace( + new AfterCheckExistsNamespaceEvent(catalogName, namespaceObj)); + return resp; } @Override public Response dropNamespace( String prefix, String namespace, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropNamespace(prefix, namespace, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeDropNamespace( + new BeforeDropNamespaceEvent(catalogName, decodeNamespace(namespace))); + Response resp = delegate.dropNamespace(prefix, namespace, realmContext, securityContext); + polarisEventListener.onAfterDropNamespace(new AfterDropNamespaceEvent(catalogName, namespace)); + return resp; } @Override @@ -91,8 +186,18 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.updateProperties( - prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeUpdateNamespaceProperties( + new BeforeUpdateNamespacePropertiesEvent( + catalogName, namespaceObj, updateNamespacePropertiesRequest)); + Response resp = + delegate.updateProperties( + prefix, namespace, updateNamespacePropertiesRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateNamespaceProperties( + new AfterUpdateNamespacePropertiesEvent( + catalogName, namespaceObj, (UpdateNamespacePropertiesResponse) resp.getEntity())); + return resp; } @Override @@ -103,8 +208,28 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String accessDelegationMode, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createTable( - prefix, namespace, createTableRequest, accessDelegationMode, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeCreateTable( + new BeforeCreateTableEvent( + catalogName, namespaceObj, createTableRequest, accessDelegationMode)); + Response resp = + delegate.createTable( + prefix, + namespace, + createTableRequest, + accessDelegationMode, + realmContext, + securityContext); + if (!createTableRequest.stageCreate()) { + polarisEventListener.onAfterCreateTable( + new AfterCreateTableEvent( + catalogName, + namespaceObj, + createTableRequest.name(), + (LoadTableResponse) resp.getEntity())); + } + return resp; } @Override @@ -115,8 +240,13 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listTables( - prefix, namespace, pageToken, pageSize, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeListTables(new BeforeListTablesEvent(catalogName, namespaceObj)); + Response resp = + delegate.listTables(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onAfterListTables(new AfterListTablesEvent(catalogName, namespaceObj)); + return resp; } @Override @@ -129,15 +259,24 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String snapshots, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadTable( - prefix, - namespace, - table, - accessDelegationMode, - ifNoneMatchString, - snapshots, - realmContext, - securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeLoadTable( + new BeforeLoadTableEvent( + catalogName, namespaceObj, table, accessDelegationMode, ifNoneMatchString, snapshots)); + Response resp = + delegate.loadTable( + prefix, + namespace, + table, + accessDelegationMode, + ifNoneMatchString, + snapshots, + realmContext, + securityContext); + polarisEventListener.onAfterLoadTable( + new AfterLoadTableEvent(catalogName, namespaceObj, (LoadTableResponse) resp.getEntity())); + return resp; } @Override @@ -147,7 +286,14 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String table, RealmContext realmContext, SecurityContext securityContext) { - return delegate.tableExists(prefix, namespace, table, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeCheckExistsTable( + new BeforeCheckExistsTableEvent(catalogName, namespaceObj, table)); + Response resp = delegate.tableExists(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onAfterCheckExistsTable( + new AfterCheckExistsTableEvent(catalogName, namespaceObj, table)); + return resp; } @Override @@ -158,8 +304,15 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal Boolean purgeRequested, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropTable( - prefix, namespace, table, purgeRequested, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeDropTable( + new BeforeDropTableEvent(catalogName, namespaceObj, table, purgeRequested)); + Response resp = + delegate.dropTable(prefix, namespace, table, purgeRequested, realmContext, securityContext); + polarisEventListener.onAfterDropTable( + new AfterDropTableEvent(catalogName, namespaceObj, table, purgeRequested)); + return resp; } @Override @@ -169,8 +322,17 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal RegisterTableRequest registerTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.registerTable( - prefix, namespace, registerTableRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeRegisterTable( + new BeforeRegisterTableEvent(catalogName, namespaceObj, registerTableRequest)); + Response resp = + delegate.registerTable( + prefix, namespace, registerTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRegisterTable( + new AfterRegisterTableEvent( + catalogName, namespaceObj, (LoadTableResponse) resp.getEntity())); + return resp; } @Override @@ -179,7 +341,13 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeRenameTable( + new BeforeRenameTableEvent(catalogName, renameTableRequest)); + Response resp = delegate.renameTable(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRenameTable( + new AfterRenameTableEvent(catalogName, renameTableRequest)); + return resp; } @Override @@ -190,8 +358,17 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal CommitTableRequest commitTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.updateTable( - prefix, namespace, table, commitTableRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeUpdateTable( + new BeforeUpdateTableEvent(catalogName, namespaceObj, table, commitTableRequest)); + Response resp = + delegate.updateTable( + prefix, namespace, table, commitTableRequest, realmContext, securityContext); + polarisEventListener.onAfterUpdateTable( + new AfterUpdateTableEvent( + catalogName, namespaceObj, table, (LoadTableResponse) resp.getEntity())); + return resp; } @Override @@ -201,7 +378,15 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal CreateViewRequest createViewRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeCreateView( + new BeforeCreateViewEvent(catalogName, namespaceObj, createViewRequest)); + Response resp = + delegate.createView(prefix, namespace, createViewRequest, realmContext, securityContext); + polarisEventListener.onAfterCreateView( + new AfterCreateViewEvent(catalogName, namespaceObj, (LoadViewResponse) resp.getEntity())); + return resp; } @Override @@ -212,8 +397,13 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal Integer pageSize, RealmContext realmContext, SecurityContext securityContext) { - return delegate.listViews( - prefix, namespace, pageToken, pageSize, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeListViews(new BeforeListViewsEvent(catalogName, namespaceObj)); + Response resp = + delegate.listViews(prefix, namespace, pageToken, pageSize, realmContext, securityContext); + polarisEventListener.onAfterListViews(new AfterListViewsEvent(catalogName, namespaceObj)); + return resp; } @Override @@ -223,7 +413,15 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String table, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeLoadCredentials( + new BeforeLoadCredentialsEvent(catalogName, namespaceObj, table)); + Response resp = + delegate.loadCredentials(prefix, namespace, table, realmContext, securityContext); + polarisEventListener.onAfterLoadCredentials( + new AfterLoadCredentialsEvent(catalogName, namespaceObj, table)); + return resp; } @Override @@ -233,7 +431,13 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.loadView(prefix, namespace, view, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeLoadView(new BeforeLoadViewEvent(catalogName, namespaceObj, view)); + Response resp = delegate.loadView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterLoadView( + new AfterLoadViewEvent(catalogName, namespaceObj, (LoadViewResponse) resp.getEntity())); + return resp; } @Override @@ -243,7 +447,14 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.viewExists(prefix, namespace, view, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeCheckExistsView( + new BeforeCheckExistsViewEvent(catalogName, namespaceObj, view)); + Response resp = delegate.viewExists(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterCheckExistsView( + new AfterCheckExistsViewEvent(catalogName, namespaceObj, view)); + return resp; } @Override @@ -253,7 +464,12 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal String view, RealmContext realmContext, SecurityContext securityContext) { - return delegate.dropView(prefix, namespace, view, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeDropView(new BeforeDropViewEvent(catalogName, namespaceObj, view)); + Response resp = delegate.dropView(prefix, namespace, view, realmContext, securityContext); + polarisEventListener.onAfterDropView(new AfterDropViewEvent(catalogName, namespaceObj, view)); + return resp; } @Override @@ -262,7 +478,13 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal RenameTableRequest renameTableRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.renameView(prefix, renameTableRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeRenameView( + new BeforeRenameViewEvent(catalogName, renameTableRequest)); + Response resp = delegate.renameView(prefix, renameTableRequest, realmContext, securityContext); + polarisEventListener.onAfterRenameView( + new AfterRenameViewEvent(catalogName, renameTableRequest)); + return resp; } @Override @@ -273,8 +495,17 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal CommitViewRequest commitViewRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.replaceView( - prefix, namespace, view, commitViewRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeReplaceView( + new BeforeReplaceViewEvent(catalogName, namespaceObj, view, commitViewRequest)); + Response resp = + delegate.replaceView( + prefix, namespace, view, commitViewRequest, realmContext, securityContext); + polarisEventListener.onAfterReplaceView( + new AfterReplaceViewEvent( + catalogName, namespaceObj, view, (LoadViewResponse) resp.getEntity())); + return resp; } @Override @@ -283,10 +514,19 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal CommitTransactionRequest commitTransactionRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.commitTransaction( - prefix, commitTransactionRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + polarisEventListener.onBeforeCommitTransaction( + new IcebergRestCatalogEvents.BeforeCommitTransactionEvent( + catalogName, commitTransactionRequest)); + Response resp = + delegate.commitTransaction(prefix, commitTransactionRequest, realmContext, securityContext); + polarisEventListener.onAfterCommitTransaction( + new IcebergRestCatalogEvents.AfterCommitTransactionEvent( + catalogName, commitTransactionRequest)); + return resp; } + /** This API is currently a no-op in Polaris. */ @Override public Response reportMetrics( String prefix, @@ -307,7 +547,15 @@ public class IcebergRestCatalogEventServiceDelegator implements IcebergRestCatal NotificationRequest notificationRequest, RealmContext realmContext, SecurityContext securityContext) { - return delegate.sendNotification( - prefix, namespace, table, notificationRequest, realmContext, securityContext); + String catalogName = prefixParser.prefixToCatalogName(realmContext, prefix); + Namespace namespaceObj = decodeNamespace(namespace); + polarisEventListener.onBeforeSendNotification( + new BeforeSendNotificationEvent(catalogName, namespaceObj, table, notificationRequest)); + Response resp = + delegate.sendNotification( + prefix, namespace, table, notificationRequest, realmContext, securityContext); + polarisEventListener.onAfterSendNotification( + new AfterSendNotificationEvent(catalogName, namespaceObj, table)); + return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java index 6e018c9c2..55b563e3c 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestConfigurationEventServiceDelegator.java @@ -25,8 +25,11 @@ import jakarta.decorator.Delegate; import jakarta.inject.Inject; import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.SecurityContext; +import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApiService; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.listeners.PolarisEventListener; @Decorator @Priority(1000) @@ -34,10 +37,16 @@ public class IcebergRestConfigurationEventServiceDelegator implements IcebergRestConfigurationApiService { @Inject @Delegate IcebergCatalogAdapter delegate; + @Inject PolarisEventListener polarisEventListener; @Override public Response getConfig( String warehouse, RealmContext realmContext, SecurityContext securityContext) { - return delegate.getConfig(warehouse, realmContext, securityContext); + polarisEventListener.onBeforeGetConfig( + new IcebergRestCatalogEvents.BeforeGetConfigEvent(warehouse)); + Response resp = delegate.getConfig(warehouse, realmContext, securityContext); + polarisEventListener.onAfterGetConfig( + new IcebergRestCatalogEvents.AfterGetConfigEvent(resp.readEntity(ConfigResponse.class))); + return resp; } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java similarity index 93% rename from runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java index d1ccfb196..0eb3ebf01 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTaskAttemptedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterAttemptTaskEvent.java @@ -26,5 +26,5 @@ package org.apache.polaris.service.events; * initial (non-retried) attempt starts counting from 1. * @param success Whether the attempt succeeded. */ -public record AfterTaskAttemptedEvent(long taskEntityId, int attempt, boolean success) +public record AfterAttemptTaskEvent(long taskEntityId, int attempt, boolean success) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java deleted file mode 100644 index 0ed7cef48..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.service.events; - -/** Emitted Polaris creates a catalog. */ -public record AfterCatalogCreatedEvent(String catalogName) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java deleted file mode 100644 index a69d30448..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception - * while committing. - * - * @param catalogName The catalog name where this table exists. - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterTableCommitedEvent( - String catalogName, TableIdentifier identifier, TableMetadata base, TableMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java deleted file mode 100644 index dd8939a35..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.service.events; - -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.catalog.TableIdentifier; - -/** Emitted when Polaris creates a table. */ -public record AfterTableCreatedEvent( - String catalogName, TableIdentifier identifier, TableMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java deleted file mode 100644 index caef8e3c5..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterTableRefreshedEvent.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris refreshes its known version of a table's metadata by fetching the latest. - * - * @param catalogName The name of the catalog where the table is located - * @param tableIdentifier The identifier of the table that was refreshed. - */ -public record AfterTableRefreshedEvent(String catalogName, TableIdentifier tableIdentifier) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java deleted file mode 100644 index a7d01ecc2..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.view.ViewMetadata; - -/** - * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception - * while committing. - * - * @param catalogName The catalog name where the view is located - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterViewCommitedEvent( - String catalogName, TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java deleted file mode 100644 index b44fa2a51..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/AfterViewRefreshedEvent.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted after Polaris refreshes its known version of a view's metadata by fetching the latest. - * - * @param catalogName - * @param viewIdentifier The identifier of the view that was refreshed. - */ -public record AfterViewRefreshedEvent(String catalogName, TableIdentifier viewIdentifier) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java similarity index 92% rename from runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java index fcfa4400a..0a5fad641 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTaskAttemptedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeAttemptTaskEvent.java @@ -25,4 +25,4 @@ package org.apache.polaris.service.events; * @param attempt The attempt number. Each retry of the task will have its own attempt number. The * initial (non-retried) attempt starts counting from 1. */ -public record BeforeTaskAttemptedEvent(long taskEntityId, int attempt) implements PolarisEvent {} +public record BeforeAttemptTaskEvent(long taskEntityId, int attempt) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java similarity index 93% rename from runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java index 1d9780ebe..e20d83202 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeRequestRateLimitedEvent.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeLimitRequestRateEvent.java @@ -24,5 +24,5 @@ package org.apache.polaris.service.events; * @param method The request's HTTP method * @param absolutePath The request's absolute path */ -public record BeforeRequestRateLimitedEvent(String method, String absolutePath) +public record BeforeLimitRequestRateEvent(String method, String absolutePath) implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java deleted file mode 100644 index 511f6f097..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableCommitedEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to perform a commit to a table. There is no guarantee on the order - * of this event relative to the validation checks we've performed, which means the commit may still - * fail Polaris-side validation checks. - * - * @param tableIdentifier The identifier. - * @param metadataBefore The old metadata. - * @param metadataAfter The new metadata. - */ -public record BeforeTableCommitedEvent( - TableIdentifier tableIdentifier, TableMetadata metadataBefore, TableMetadata metadataAfter) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java deleted file mode 100644 index f5daade51..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeTableRefreshedEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to refresh its known version of a table's metadata by fetching the - * latest. - * - * @param catalogName The name of the catalog where the view is located. - * @param tableIdentifier The identifier of the table being refreshed. - */ -public record BeforeTableRefreshedEvent(String catalogName, TableIdentifier tableIdentifier) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java deleted file mode 100644 index c9303de71..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewCommitedEvent.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.view.ViewMetadata; - -/** - * Emitted when Polaris intends to perform a commit to a view. There is no guarantee on the order of - * this event relative to the validation checks we've performed, which means the commit may still - * fail Polaris-side validation checks. - * - * @param catalogName The name of the catalog where the view is located. - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record BeforeViewCommitedEvent( - String catalogName, TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java b/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java deleted file mode 100644 index 32b3250c7..000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/BeforeViewRefreshedEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.polaris.service.events; - -import org.apache.iceberg.catalog.TableIdentifier; - -/** - * Emitted when Polaris intends to refresh its known version of a view's metadata by fetching the - * latest. - * - * @param catalogName The name of the catalog where the view is located. - * @param viewIdentifier The identifier of the view being refreshed. - */ -public record BeforeViewRefreshedEvent(String catalogName, TableIdentifier viewIdentifier) - implements PolarisEvent {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java new file mode 100644 index 000000000..76d3cf298 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/IcebergRestCatalogEvents.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import java.util.Map; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.polaris.service.types.CommitTableRequest; +import org.apache.polaris.service.types.CommitViewRequest; +import org.apache.polaris.service.types.NotificationRequest; + +/** + * Event records for Iceberg REST Catalog operations. Each operation has corresponding "Before" and + * "After" event records. + */ +public class IcebergRestCatalogEvents { + + // Namespace Events + public record BeforeCreateNamespaceEvent( + String catalogName, CreateNamespaceRequest createNamespaceRequest) {} + + public record AfterCreateNamespaceEvent( + String catalogName, Namespace namespace, Map<String, String> namespaceProperties) {} + + public record BeforeListNamespacesEvent(String catalogName, String parent) {} + + public record AfterListNamespacesEvent(String catalogName, String parent) {} + + public record BeforeLoadNamespaceMetadataEvent(String catalogName, Namespace namespace) {} + + public record AfterLoadNamespaceMetadataEvent( + String catalogName, Namespace namespace, Map<String, String> namespaceProperties) {} + + public record BeforeCheckExistsNamespaceEvent(String catalogName, Namespace namespace) {} + + public record AfterCheckExistsNamespaceEvent(String catalogName, Namespace namespace) {} + + public record BeforeDropNamespaceEvent(String catalogName, Namespace namespace) {} + + public record AfterDropNamespaceEvent(String catalogName, String namespace) {} + + public record BeforeUpdateNamespacePropertiesEvent( + String catalogName, + Namespace namespace, + UpdateNamespacePropertiesRequest updateNamespacePropertiesRequest) {} + + public record AfterUpdateNamespacePropertiesEvent( + String catalogName, + Namespace namespace, + UpdateNamespacePropertiesResponse updateNamespacePropertiesResponse) {} + + // Table Events + public record BeforeCreateTableEvent( + String catalogName, + Namespace namespace, + CreateTableRequest createTableRequest, + String accessDelegationMode) {} + + public record AfterCreateTableEvent( + String catalogName, + Namespace namespace, + String tableName, + LoadTableResponse loadTableResponse) {} + + public record BeforeListTablesEvent(String catalogName, Namespace namespace) {} + + public record AfterListTablesEvent(String catalogName, Namespace namespace) {} + + public record BeforeLoadTableEvent( + String catalogName, + Namespace namespace, + String table, + String accessDelegationMode, + String ifNoneMatchString, + String snapshots) {} + + public record AfterLoadTableEvent( + String catalogName, Namespace namespace, LoadTableResponse loadTableResponse) {} + + public record BeforeCheckExistsTableEvent( + String catalogName, Namespace namespace, String table) {} + + public record AfterCheckExistsTableEvent(String catalogName, Namespace namespace, String table) {} + + public record BeforeDropTableEvent( + String catalogName, Namespace namespace, String table, Boolean purgeRequested) {} + + public record AfterDropTableEvent( + String catalogName, Namespace namespace, String table, Boolean purgeRequested) {} + + public record BeforeRegisterTableEvent( + String catalogName, Namespace namespace, RegisterTableRequest registerTableRequest) {} + + public record AfterRegisterTableEvent( + String catalogName, Namespace namespace, LoadTableResponse loadTableResponse) {} + + public record BeforeRenameTableEvent(String catalogName, RenameTableRequest renameTableRequest) {} + + public record AfterRenameTableEvent(String catalogName, RenameTableRequest renameTableRequest) {} + + public record BeforeUpdateTableEvent( + String catalogName, + Namespace namespace, + String sourceTable, + CommitTableRequest commitTableRequest) {} + + public record AfterUpdateTableEvent( + String catalogName, + Namespace namespace, + String sourceTable, + LoadTableResponse loadTableResponse) {} + + // View Events + public record BeforeCreateViewEvent( + String catalogName, Namespace namespace, CreateViewRequest createViewRequest) {} + + public record AfterCreateViewEvent( + String catalogName, Namespace namespace, LoadViewResponse loadViewResponse) {} + + public record BeforeListViewsEvent(String catalogName, Namespace namespace) {} + + public record AfterListViewsEvent(String catalogName, Namespace namespace) {} + + public record BeforeLoadViewEvent(String catalogName, Namespace namespace, String view) {} + + public record AfterLoadViewEvent( + String catalogName, Namespace namespace, LoadViewResponse loadViewResponse) {} + + public record BeforeCheckExistsViewEvent(String catalogName, Namespace namespace, String view) {} + + public record AfterCheckExistsViewEvent(String catalogName, Namespace namespace, String view) {} + + public record BeforeDropViewEvent(String catalogName, Namespace namespace, String view) {} + + public record AfterDropViewEvent(String catalogName, Namespace namespace, String view) {} + + public record BeforeRenameViewEvent(String catalogName, RenameTableRequest renameTableRequest) {} + + public record AfterRenameViewEvent(String catalogName, RenameTableRequest renameTableRequest) {} + + public record BeforeReplaceViewEvent( + String catalogName, + Namespace namespace, + String sourceView, + CommitViewRequest commitViewRequest) {} + + public record AfterReplaceViewEvent( + String catalogName, + Namespace namespace, + String sourceView, + LoadViewResponse loadViewResponse) {} + + // Credential Events + public record BeforeLoadCredentialsEvent(String catalogName, Namespace namespace, String table) {} + + public record AfterLoadCredentialsEvent(String catalogName, Namespace namespace, String table) {} + + // Transaction Events + public record BeforeCommitTransactionEvent( + String catalogName, CommitTransactionRequest commitTransactionRequest) {} + + // TODO: Add all PolarisEntities that were modified with this transaction. + public record AfterCommitTransactionEvent( + String catalogName, CommitTransactionRequest commitTransactionRequest) {} + + // Notification Events + public record BeforeSendNotificationEvent( + String catalogName, + Namespace namespace, + String table, + NotificationRequest notificationRequest) {} + + // TODO: Add result once SendNotification API changes are confirmed to return the result. + public record AfterSendNotificationEvent(String catalogName, Namespace namespace, String table) {} + + // Configuration Events + public record BeforeGetConfigEvent(String warehouse) {} + + public record AfterGetConfigEvent(ConfigResponse configResponse) {} + + // Legacy events + public record BeforeCommitTableEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata metadataBefore, + TableMetadata metadataAfter) + implements PolarisEvent {} + + public record AfterCommitTableEvent( + String catalogName, + TableIdentifier identifier, + TableMetadata metadataBefore, + TableMetadata metadataAfter) + implements PolarisEvent {} + + public record BeforeCommitViewEvent( + String catalogName, + TableIdentifier identifier, + ViewMetadata metadataBefore, + ViewMetadata metadataAfter) + implements PolarisEvent {} + + public record AfterCommitViewEvent( + String catalogName, + TableIdentifier identifier, + ViewMetadata metadataBefore, + ViewMetadata metadataAfter) + implements PolarisEvent {} + + public record BeforeRefreshTableEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} + + public record AfterRefreshTableEvent(String catalogName, TableIdentifier tableIdentifier) + implements PolarisEvent {} + + public record BeforeRefreshViewEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} + + public record AfterRefreshViewEvent(String catalogName, TableIdentifier viewIdentifier) + implements PolarisEvent {} +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java index 93eec9926..3d23129e6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java @@ -20,7 +20,7 @@ package org.apache.polaris.service.events.jsonEventListener; import java.util.HashMap; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; /** @@ -34,7 +34,7 @@ public abstract class PropertyMapEventListener extends PolarisEventListener { protected abstract void transformAndSendEvent(HashMap<String, Object> properties); @Override - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { HashMap<String, Object> properties = new HashMap<>(); properties.put("event_type", event.getClass().getSimpleName()); properties.put("table_identifier", event.tableIdentifier().toString()); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java index 5e1122d25..20dc304d0 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java @@ -18,66 +18,217 @@ */ package org.apache.polaris.service.events.listeners; -import org.apache.polaris.service.events.AfterCatalogCreatedEvent; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableCreatedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; import org.apache.polaris.service.events.CatalogGenericTableServiceEvents; import org.apache.polaris.service.events.CatalogPolicyServiceEvents; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; /** * Represents an event listener that can respond to notable moments during Polaris's execution. * Event details are documented under the event objects themselves. */ public abstract class PolarisEventListener { + /** {@link BeforeLimitRequestRateEvent} */ + public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) {} - /** {@link BeforeRequestRateLimitedEvent} */ - public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeCommitTableEvent} */ + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {} - /** {@link BeforeTableCommitedEvent} */ - public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterCommitTableEvent} */ + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {} - /** {@link AfterTableCommitedEvent} */ - public void onAfterTableCommited(AfterTableCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeCommitViewEvent} */ + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {} - /** {@link BeforeViewCommitedEvent} */ - public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterCommitViewEvent} */ + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) {} - /** {@link AfterViewCommitedEvent} */ - public void onAfterViewCommited(AfterViewCommitedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeRefreshTableEvent} */ + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {} - /** {@link BeforeTableRefreshedEvent} */ - public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterRefreshTableEvent} */ + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {} - /** {@link AfterTableRefreshedEvent} */ - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeRefreshViewEvent} */ + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {} - /** {@link BeforeViewRefreshedEvent} */ - public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterRefreshViewEvent} */ + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {} - /** {@link AfterViewRefreshedEvent} */ - public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + /** {@link BeforeAttemptTaskEvent} */ + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {} - /** {@link BeforeTaskAttemptedEvent} */ - public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + /** {@link AfterAttemptTaskEvent} */ + public void onAfterAttemptTask(AfterAttemptTaskEvent event) {} - /** {@link AfterTaskAttemptedEvent} */ - public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + // Iceberg REST Catalog Namespace Events + /** {@link IcebergRestCatalogEvents.BeforeCreateNamespaceEvent} */ + public void onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent event) {} - /** {@link AfterTableCreatedEvent} */ - public void onAfterTableCreated(AfterTableCreatedEvent event) {} + /** {@link IcebergRestCatalogEvents.AfterCreateNamespaceEvent} */ + public void onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent event) {} - /** {@link AfterCatalogCreatedEvent} */ - public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) {} + /** {@link IcebergRestCatalogEvents.BeforeListNamespacesEvent} */ + public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListNamespacesEvent} */ + public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent} */ + public void onBeforeLoadNamespaceMetadata( + IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent} */ + public void onAfterLoadNamespaceMetadata( + IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent} */ + public void onBeforeCheckExistsNamespace( + IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent} */ + public void onAfterCheckExistsNamespace( + IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropNamespaceEvent} */ + public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropNamespaceEvent} */ + public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent} */ + public void onBeforeUpdateNamespaceProperties( + IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent} */ + public void onAfterUpdateNamespaceProperties( + IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) {} + + // Iceberg REST Catalog Table Events + /** {@link IcebergRestCatalogEvents.BeforeCreateTableEvent} */ + public void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCreateTableEvent} */ + public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeListTablesEvent} */ + public void onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListTablesEvent} */ + public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadTableEvent} */ + public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadTableEvent} */ + public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckExistsTableEvent} */ + public void onBeforeCheckExistsTable( + IcebergRestCatalogEvents.BeforeCheckExistsTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckExistsTableEvent} */ + public void onAfterCheckExistsTable(IcebergRestCatalogEvents.AfterCheckExistsTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropTableEvent} */ + public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropTableEvent} */ + public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRegisterTableEvent} */ + public void onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRegisterTableEvent} */ + public void onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRenameTableEvent} */ + public void onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRenameTableEvent} */ + public void onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeUpdateTableEvent} */ + public void onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterUpdateTableEvent} */ + public void onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) {} + + // Iceberg REST Catalog View Events + /** {@link IcebergRestCatalogEvents.BeforeCreateViewEvent} */ + public void onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCreateViewEvent} */ + public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeListViewsEvent} */ + public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterListViewsEvent} */ + public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeLoadViewEvent} */ + public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadViewEvent} */ + public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeCheckExistsViewEvent} */ + public void onBeforeCheckExistsView(IcebergRestCatalogEvents.BeforeCheckExistsViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCheckExistsViewEvent} */ + public void onAfterCheckExistsView(IcebergRestCatalogEvents.AfterCheckExistsViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeDropViewEvent} */ + public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterDropViewEvent} */ + public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeRenameViewEvent} */ + public void onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterRenameViewEvent} */ + public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.BeforeReplaceViewEvent} */ + public void onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterReplaceViewEvent} */ + public void onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) {} + + // Iceberg REST Catalog Credential Events + /** {@link IcebergRestCatalogEvents.BeforeLoadCredentialsEvent} */ + public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterLoadCredentialsEvent} */ + public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) {} + + // Iceberg REST Catalog Transactions Events + /** {@link IcebergRestCatalogEvents.BeforeCommitTransactionEvent} */ + public void onBeforeCommitTransaction( + IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterCommitTransactionEvent} */ + public void onAfterCommitTransaction( + IcebergRestCatalogEvents.AfterCommitTransactionEvent event) {} + + // Iceberg REST Catalog Notification Events + /** {@link IcebergRestCatalogEvents.BeforeSendNotificationEvent} */ + public void onBeforeSendNotification( + IcebergRestCatalogEvents.BeforeSendNotificationEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterSendNotificationEvent} */ + public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) {} + + // Iceberg REST Catalog Configuration Events + /** {@link IcebergRestCatalogEvents.BeforeGetConfigEvent} */ + public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent event) {} + + /** {@link IcebergRestCatalogEvents.AfterGetConfigEvent} */ + public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent event) {} // Catalog Policy Service Events /** {@link CatalogPolicyServiceEvents.BeforeCreatePolicyEvent} */ diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java index 4b30e8fa3..e9d43f003 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java @@ -21,61 +21,55 @@ package org.apache.polaris.service.events.listeners; import jakarta.annotation.Nullable; import java.util.Map; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.entity.PolarisEvent; -import org.apache.polaris.service.events.AfterCatalogCreatedEvent; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableCreatedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; public abstract class PolarisPersistenceEventListener extends PolarisEventListener { - // TODO: Ensure all events (except RateLimiter ones) call `addToBuffer` + // TODO: Ensure all events (except RateLimiter ones) call `processEvent` @Override - public final void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) {} + public final void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) {} @Override - public void onBeforeTableCommited(BeforeTableCommitedEvent event) {} + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) {} @Override - public void onAfterTableCommited(AfterTableCommitedEvent event) {} + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) {} @Override - public void onBeforeViewCommited(BeforeViewCommitedEvent event) {} + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) {} @Override - public void onAfterViewCommited(AfterViewCommitedEvent event) {} + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) {} @Override - public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) {} + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) {} @Override - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) {} + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {} @Override - public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) {} + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) {} @Override - public void onAfterViewRefreshed(AfterViewRefreshedEvent event) {} + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) {} @Override - public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) {} + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) {} @Override - public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) {} + public void onAfterAttemptTask(AfterAttemptTaskEvent event) {} @Override - public void onAfterTableCreated(AfterTableCreatedEvent event) { + public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) { ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); + TableMetadata tableMetadata = event.loadTableResponse().tableMetadata(); PolarisEvent polarisEvent = new PolarisEvent( event.catalogName(), @@ -85,33 +79,17 @@ public abstract class PolarisPersistenceEventListener extends PolarisEventListen contextSpecificInformation.timestamp(), contextSpecificInformation.principalName(), PolarisEvent.ResourceType.TABLE, - event.identifier().toString()); + TableIdentifier.of(event.namespace(), event.tableName()).toString()); Map<String, String> additionalParameters = Map.of( "table-uuid", - event.metadata().uuid(), + tableMetadata.uuid(), "metadata", - TableMetadataParser.toJson(event.metadata())); + TableMetadataParser.toJson(tableMetadata)); polarisEvent.setAdditionalProperties(additionalParameters); processEvent(polarisEvent); } - @Override - public void onAfterCatalogCreated(AfterCatalogCreatedEvent event) { - ContextSpecificInformation contextSpecificInformation = getContextSpecificInformation(); - PolarisEvent polarisEvent = - new PolarisEvent( - event.catalogName(), - org.apache.polaris.service.events.PolarisEvent.createEventId(), - getRequestId(), - event.getClass().getSimpleName(), - contextSpecificInformation.timestamp(), - contextSpecificInformation.principalName(), - PolarisEvent.ResourceType.CATALOG, - event.catalogName()); - processEvent(polarisEvent); - } - protected record ContextSpecificInformation(long timestamp, @Nullable String principalName) {} abstract ContextSpecificInformation getContextSpecificInformation(); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java index cf95452de..d0b9b5f92 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java @@ -23,17 +23,10 @@ import io.smallrye.common.annotation.Identifier; import jakarta.enterprise.context.ApplicationScoped; import java.util.ArrayList; import java.util.List; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.PolarisEvent; /** Event listener that stores all emitted events forever. Not recommended for use in production. */ @@ -48,57 +41,57 @@ public class TestPolarisEventListener extends PolarisEventListener { } @Override - public void onBeforeRequestRateLimited(BeforeRequestRateLimitedEvent event) { + public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) { history.add(event); } @Override - public void onBeforeTableCommited(BeforeTableCommitedEvent event) { + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) { history.add(event); } @Override - public void onAfterTableCommited(AfterTableCommitedEvent event) { + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) { history.add(event); } @Override - public void onBeforeViewCommited(BeforeViewCommitedEvent event) { + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) { history.add(event); } @Override - public void onAfterViewCommited(AfterViewCommitedEvent event) { + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) { history.add(event); } @Override - public void onBeforeTableRefreshed(BeforeTableRefreshedEvent event) { + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) { history.add(event); } @Override - public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { history.add(event); } @Override - public void onBeforeViewRefreshed(BeforeViewRefreshedEvent event) { + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) { history.add(event); } @Override - public void onAfterViewRefreshed(AfterViewRefreshedEvent event) { + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) { history.add(event); } @Override - public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event) { + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) { history.add(event); } @Override - public void onAfterTaskAttempted(AfterTaskAttemptedEvent event) { + public void onAfterAttemptTask(AfterAttemptTaskEvent event) { history.add(event); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java index 61d27a040..6ea95db4e 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/ratelimiter/RateLimiterFilter.java @@ -28,7 +28,7 @@ import jakarta.ws.rs.core.Response; import jakarta.ws.rs.ext.Provider; import java.io.IOException; import org.apache.polaris.service.config.FilterPriorities; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +54,8 @@ public class RateLimiterFilter implements ContainerRequestFilter { @Override public void filter(ContainerRequestContext ctx) throws IOException { if (!rateLimiter.canProceed()) { - polarisEventListener.onBeforeRequestRateLimited( - new BeforeRequestRateLimitedEvent( + polarisEventListener.onBeforeLimitRequestRate( + new BeforeLimitRequestRateEvent( ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString())); ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build()); LOGGER.atDebug().log("Rate limiting request"); diff --git a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java index d425c1675..55847b8eb 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java @@ -43,8 +43,8 @@ import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.MetaStoreManagerFactory; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.tracing.TracingFilter; import org.slf4j.Logger; @@ -143,7 +143,7 @@ public class TaskExecutorImpl implements TaskExecutor { } protected void handleTask(long taskEntityId, CallContext ctx, int attempt) { - polarisEventListener.onBeforeTaskAttempted(new BeforeTaskAttemptedEvent(taskEntityId, attempt)); + polarisEventListener.onBeforeAttemptTask(new BeforeAttemptTaskEvent(taskEntityId, attempt)); boolean success = false; try { @@ -186,8 +186,8 @@ public class TaskExecutorImpl implements TaskExecutor { .log("Unable to execute async task"); } } finally { - polarisEventListener.onAfterTaskAttempted( - new AfterTaskAttemptedEvent(taskEntityId, attempt, success)); + polarisEventListener.onAfterAttemptTask( + new AfterAttemptTaskEvent(taskEntityId, attempt, success)); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java index 1ec9d87ba..3d66c4858 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java @@ -137,10 +137,7 @@ import org.apache.polaris.service.catalog.io.ExceptionMappingFileIO; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.catalog.io.MeasuredFileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.AfterTableCommitedEvent; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; -import org.apache.polaris.service.events.BeforeTableCommitedEvent; -import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.exception.FakeAzureHttpResponse; @@ -2229,22 +2226,26 @@ public abstract class AbstractIcebergCatalogTest extends CatalogTests<IcebergCat table.updateProperties().set(key, valOld).commit(); table.updateProperties().set(key, valNew).commit(); - var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeTableRefreshedEvent.class); + var beforeRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshTableEvent.class); Assertions.assertThat(beforeRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); - var afterRefreshEvent = testPolarisEventListener.getLatest(AfterTableRefreshedEvent.class); + var afterRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshTableEvent.class); Assertions.assertThat(afterRefreshEvent.tableIdentifier()).isEqualTo(TestData.TABLE); - var beforeTableEvent = testPolarisEventListener.getLatest(BeforeTableCommitedEvent.class); - Assertions.assertThat(beforeTableEvent.tableIdentifier()).isEqualTo(TestData.TABLE); + var beforeTableEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitTableEvent.class); + Assertions.assertThat(beforeTableEvent.identifier()).isEqualTo(TestData.TABLE); Assertions.assertThat(beforeTableEvent.metadataBefore().properties().get(key)) .isEqualTo(valOld); Assertions.assertThat(beforeTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew); - var afterTableEvent = testPolarisEventListener.getLatest(AfterTableCommitedEvent.class); + var afterTableEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitTableEvent.class); Assertions.assertThat(afterTableEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(afterTableEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(afterTableEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(afterTableEvent.metadataBefore().properties().get(key)).isEqualTo(valOld); + Assertions.assertThat(afterTableEvent.metadataAfter().properties().get(key)).isEqualTo(valNew); } private static PageToken nextRequest(Page<?> previousPage) { diff --git a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java index f0e54ea3d..2514dc664 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogViewTest.java @@ -62,10 +62,7 @@ import org.apache.polaris.service.catalog.Profiles; import org.apache.polaris.service.catalog.io.DefaultFileIOFactory; import org.apache.polaris.service.catalog.io.FileIOFactory; import org.apache.polaris.service.config.ReservedProperties; -import org.apache.polaris.service.events.AfterViewCommitedEvent; -import org.apache.polaris.service.events.AfterViewRefreshedEvent; -import org.apache.polaris.service.events.BeforeViewCommitedEvent; -import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl; @@ -270,20 +267,27 @@ public abstract class AbstractIcebergCatalogViewTest extends ViewCatalogTests<Ic view.updateProperties().set(key, valOld).commit(); view.updateProperties().set(key, valNew).commit(); - var beforeRefreshEvent = testPolarisEventListener.getLatest(BeforeViewRefreshedEvent.class); + var beforeRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeRefreshViewEvent.class); Assertions.assertThat(beforeRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); - var afterRefreshEvent = testPolarisEventListener.getLatest(AfterViewRefreshedEvent.class); + var afterRefreshEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterRefreshViewEvent.class); Assertions.assertThat(afterRefreshEvent.viewIdentifier()).isEqualTo(TestData.TABLE); - var beforeCommitEvent = testPolarisEventListener.getLatest(BeforeViewCommitedEvent.class); + var beforeCommitEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.BeforeCommitViewEvent.class); Assertions.assertThat(beforeCommitEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(beforeCommitEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(beforeCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(beforeCommitEvent.metadataBefore().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(beforeCommitEvent.metadataAfter().properties().get(key)) + .isEqualTo(valNew); - var afterCommitEvent = testPolarisEventListener.getLatest(AfterViewCommitedEvent.class); + var afterCommitEvent = + testPolarisEventListener.getLatest(IcebergRestCatalogEvents.AfterCommitViewEvent.class); Assertions.assertThat(afterCommitEvent.identifier()).isEqualTo(TestData.TABLE); - Assertions.assertThat(afterCommitEvent.base().properties().get(key)).isEqualTo(valOld); - Assertions.assertThat(afterCommitEvent.metadata().properties().get(key)).isEqualTo(valNew); + Assertions.assertThat(afterCommitEvent.metadataBefore().properties().get(key)) + .isEqualTo(valOld); + Assertions.assertThat(afterCommitEvent.metadataAfter().properties().get(key)).isEqualTo(valNew); } } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 88a8ad44b..ffc157b9d 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -40,7 +40,7 @@ import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; -import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -203,8 +203,8 @@ class AwsCloudWatchEventListenerTest { try { // Create and send a test event TableIdentifier testTable = TableIdentifier.of("test_namespace", "test_table"); - AfterTableRefreshedEvent event = new AfterTableRefreshedEvent("test_catalog", testTable); - listener.onAfterTableRefreshed(event); + listener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable)); Awaitility.await("expected amount of records should be sent to CloudWatch") .atMost(Duration.ofSeconds(30)) @@ -238,7 +238,9 @@ class AwsCloudWatchEventListenerTest { logEvent -> { String message = logEvent.message(); assertThat(message).contains(REALM); - assertThat(message).contains(AfterTableRefreshedEvent.class.getSimpleName()); + assertThat(message) + .contains( + IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName()); assertThat(message).contains(TEST_USER); assertThat(message).contains(testTable.toString()); }); @@ -260,9 +262,8 @@ class AwsCloudWatchEventListenerTest { try { // Create and send a test event synchronously TableIdentifier syncTestTable = TableIdentifier.of("test_namespace", "test_table_sync"); - AfterTableRefreshedEvent syncEvent = - new AfterTableRefreshedEvent("test_catalog", syncTestTable); - syncListener.onAfterTableRefreshed(syncEvent); + syncListener.onAfterRefreshTable( + new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", syncTestTable)); Awaitility.await("expected amount of records should be sent to CloudWatch") .atMost(Duration.ofSeconds(30)) @@ -298,7 +299,7 @@ class AwsCloudWatchEventListenerTest { logEvent -> { String message = logEvent.message(); assertThat(message).contains("test_table_sync"); - assertThat(message).contains("AfterTableRefreshedEvent"); + assertThat(message).contains("AfterRefreshTableEvent"); }); } finally { // Clean up diff --git a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java index d0c2a34f1..4d5c92559 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/ratelimiter/RateLimiterFilterTest.java @@ -32,7 +32,7 @@ import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.function.Consumer; -import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; import org.apache.polaris.service.events.listeners.PolarisEventListener; import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.ratelimiter.RateLimiterFilterTest.Profile; @@ -148,9 +148,9 @@ public class RateLimiterFilterTest { } requestAsserter.accept(Status.TOO_MANY_REQUESTS); - BeforeRequestRateLimitedEvent event = + BeforeLimitRequestRateEvent event = ((TestPolarisEventListener) polarisEventListener) - .getLatest(BeforeRequestRateLimitedEvent.class); + .getLatest(BeforeLimitRequestRateEvent.class); assertThat(event.method()).isEqualTo("GET"); // Examples of expected metrics: diff --git a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java index 56cb22650..3d59c481a 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java @@ -24,8 +24,8 @@ import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.entity.TaskEntity; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.service.TestServices; -import org.apache.polaris.service.events.AfterTaskAttemptedEvent; -import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -77,7 +77,7 @@ public class TaskExecutorImplTest { @Override public boolean handleTask(TaskEntity task, CallContext callContext) { var beforeTaskAttemptedEvent = - testPolarisEventListener.getLatest(BeforeTaskAttemptedEvent.class); + testPolarisEventListener.getLatest(BeforeAttemptTaskEvent.class); Assertions.assertEquals(taskEntity.getId(), beforeTaskAttemptedEvent.taskEntityId()); Assertions.assertEquals(attempt, beforeTaskAttemptedEvent.attempt()); return true; @@ -86,8 +86,7 @@ public class TaskExecutorImplTest { executor.handleTask(taskEntity.getId(), polarisCallCtx, attempt); - var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterTaskAttemptedEvent.class); - + var afterAttemptTaskEvent = testPolarisEventListener.getLatest(AfterAttemptTaskEvent.class); Assertions.assertEquals(taskEntity.getId(), afterAttemptTaskEvent.taskEntityId()); Assertions.assertEquals(attempt, afterAttemptTaskEvent.attempt()); Assertions.assertTrue(afterAttemptTaskEvent.success());