adutra commented on code in PR #3293: URL: https://github.com/apache/polaris/pull/3293#discussion_r2630943229
########## runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import java.util.Map; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.polaris.core.admin.model.AddGrantRequest; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogRole; +import org.apache.polaris.core.admin.model.CreatePrincipalRequest; +import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest; +import org.apache.polaris.core.admin.model.GrantResource; +import org.apache.polaris.core.admin.model.Principal; +import org.apache.polaris.core.admin.model.PrincipalRole; +import org.apache.polaris.core.admin.model.PrincipalWithCredentials; +import org.apache.polaris.core.admin.model.RevokeGrantRequest; +import org.apache.polaris.core.admin.model.UpdateCatalogRequest; +import org.apache.polaris.core.admin.model.UpdateCatalogRoleRequest; +import org.apache.polaris.core.admin.model.UpdatePrincipalRequest; +import org.apache.polaris.core.admin.model.UpdatePrincipalRoleRequest; +import org.apache.polaris.core.entity.PolarisPrivilege; +import org.apache.polaris.service.types.AttachPolicyRequest; +import org.apache.polaris.service.types.CommitViewRequest; +import org.apache.polaris.service.types.CreateGenericTableRequest; +import org.apache.polaris.service.types.CreatePolicyRequest; +import org.apache.polaris.service.types.DetachPolicyRequest; +import org.apache.polaris.service.types.GenericTable; +import org.apache.polaris.service.types.GetApplicablePoliciesResponse; +import org.apache.polaris.service.types.LoadPolicyResponse; +import org.apache.polaris.service.types.NotificationRequest; +import org.apache.polaris.service.types.UpdatePolicyRequest; + +/** + * Standard attribute keys for Polaris events. These keys provide type-safe access to common event + * attributes and enable automatic pruning/filtering logic. + */ +public final class EventAttributes { + private EventAttributes() {} + + // Catalog attributes + public static final AttributeKey<String> CATALOG_NAME = + AttributeKey.of("catalog_name", String.class); + public static final AttributeKey<Catalog> CATALOG = AttributeKey.of("catalog", Catalog.class); + public static final AttributeKey<UpdateCatalogRequest> UPDATE_CATALOG_REQUEST = + AttributeKey.of("update_catalog_request", UpdateCatalogRequest.class); + + // Namespace attributes + public static final AttributeKey<Namespace> NAMESPACE = + AttributeKey.of("namespace", Namespace.class); + public static final AttributeKey<String> NAMESPACE_STRING = Review Comment: Isn't this the same as `NAMESPACE_NAME` below? Or is this rather the "fully-qualified" namespace name while the other is the simple name? ########## runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events; + +import java.util.Map; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; +import org.apache.iceberg.rest.requests.CreateNamespaceRequest; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.CreateViewRequest; +import org.apache.iceberg.rest.requests.RegisterTableRequest; +import org.apache.iceberg.rest.requests.RenameTableRequest; +import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; +import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.view.ViewMetadata; +import org.apache.polaris.core.admin.model.AddGrantRequest; +import org.apache.polaris.core.admin.model.Catalog; +import org.apache.polaris.core.admin.model.CatalogRole; +import org.apache.polaris.core.admin.model.CreatePrincipalRequest; +import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest; +import org.apache.polaris.core.admin.model.GrantResource; +import org.apache.polaris.core.admin.model.Principal; +import org.apache.polaris.core.admin.model.PrincipalRole; +import org.apache.polaris.core.admin.model.PrincipalWithCredentials; +import org.apache.polaris.core.admin.model.RevokeGrantRequest; +import org.apache.polaris.core.admin.model.UpdateCatalogRequest; +import org.apache.polaris.core.admin.model.UpdateCatalogRoleRequest; +import org.apache.polaris.core.admin.model.UpdatePrincipalRequest; +import org.apache.polaris.core.admin.model.UpdatePrincipalRoleRequest; +import org.apache.polaris.core.entity.PolarisPrivilege; +import org.apache.polaris.service.types.AttachPolicyRequest; +import org.apache.polaris.service.types.CommitViewRequest; +import org.apache.polaris.service.types.CreateGenericTableRequest; +import org.apache.polaris.service.types.CreatePolicyRequest; +import org.apache.polaris.service.types.DetachPolicyRequest; +import org.apache.polaris.service.types.GenericTable; +import org.apache.polaris.service.types.GetApplicablePoliciesResponse; +import org.apache.polaris.service.types.LoadPolicyResponse; +import org.apache.polaris.service.types.NotificationRequest; +import org.apache.polaris.service.types.UpdatePolicyRequest; + +/** + * Standard attribute keys for Polaris events. These keys provide type-safe access to common event + * attributes and enable automatic pruning/filtering logic. + */ +public final class EventAttributes { + private EventAttributes() {} + + // Catalog attributes + public static final AttributeKey<String> CATALOG_NAME = + AttributeKey.of("catalog_name", String.class); + public static final AttributeKey<Catalog> CATALOG = AttributeKey.of("catalog", Catalog.class); + public static final AttributeKey<UpdateCatalogRequest> UPDATE_CATALOG_REQUEST = + AttributeKey.of("update_catalog_request", UpdateCatalogRequest.class); + + // Namespace attributes + public static final AttributeKey<Namespace> NAMESPACE = + AttributeKey.of("namespace", Namespace.class); + public static final AttributeKey<String> NAMESPACE_STRING = + AttributeKey.of("namespace_string", String.class); + public static final AttributeKey<String> PARENT_NAMESPACE = + AttributeKey.of("parent_namespace", String.class); + public static final AttributeKey<CreateNamespaceRequest> CREATE_NAMESPACE_REQUEST = + AttributeKey.of("create_namespace_request", CreateNamespaceRequest.class); + public static final AttributeKey<UpdateNamespacePropertiesRequest> + UPDATE_NAMESPACE_PROPERTIES_REQUEST = + AttributeKey.of( + "update_namespace_properties_request", UpdateNamespacePropertiesRequest.class); + public static final AttributeKey<UpdateNamespacePropertiesResponse> + UPDATE_NAMESPACE_PROPERTIES_RESPONSE = + AttributeKey.of( + "update_namespace_properties_response", UpdateNamespacePropertiesResponse.class); + + @SuppressWarnings("unchecked") + public static final AttributeKey<Map<String, String>> NAMESPACE_PROPERTIES = + (AttributeKey<Map<String, String>>) + (AttributeKey<?>) AttributeKey.of("namespace_properties", Map.class); Review Comment: I'm wondering: in order to increase portability, shouldn't we restrict the types of attributes that we can put in this map? An attribute of type `Map<String, String>` is still OK, I guess, but what if the attribute doesn't have a clear serialized format, e.g. `Optional` or `Function`? Restricting to types that can be safely serialized to Json (and maybe gRPC) would imho make things easier for listener implementors. ########## runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java: ########## @@ -37,10 +40,15 @@ protected abstract void transformAndSendEvent( HashMap<String, Object> properties, PolarisEventMetadata metadata); @Override - public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { - HashMap<String, Object> properties = new HashMap<>(); - properties.put("event_type", event.getClass().getSimpleName()); - properties.put("table_identifier", event.tableIdentifier().toString()); - transformAndSendEvent(properties, event.metadata()); + public void onEvent(PolarisEvent event) { Review Comment: With the flattened hierarchy, we need to ask ourselves if this class is still useful, as opposed to just passing the `PolarisEvent` as is to the CloudWatch sink and let it serialize it without the need for an intermediary representation in the form of a Map. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -55,24 +82,38 @@ public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent ev processEvent(event.metadata().realmId(), polarisEvent); } - @Override - public void onAfterCreateCatalog(CatalogsServiceEvents.AfterCreateCatalogEvent event) { - PolarisEvent polarisEvent = - new PolarisEvent( - event.catalog().getName(), + private void handleAfterCreateCatalog(PolarisEvent event) { + Catalog catalog = getRequiredAttribute(event, EventAttributes.CATALOG); + if (catalog == null) { + return; + } + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + catalog.getName(), event.metadata().eventId().toString(), event.metadata().requestId().orElse(null), - event.getClass().getSimpleName(), + event.type().name(), event.metadata().timestamp().toEpochMilli(), event.metadata().user().map(PolarisPrincipal::getName).orElse(null), - PolarisEvent.ResourceType.CATALOG, - event.catalog().getName()); + org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG, + catalog.getName()); Map<String, String> openTelemetryContext = event.metadata().openTelemetryContext(); if (!openTelemetryContext.isEmpty()) { polarisEvent.setAdditionalProperties(openTelemetryContext); } processEvent(event.metadata().realmId(), polarisEvent); } - protected abstract void processEvent(String realmId, PolarisEvent event); + private <T> T getRequiredAttribute(PolarisEvent event, AttributeKey<T> key) { Review Comment: This could be declared in `PolarisEvent`, I think it's going to be useful for many listeners, not just this one. ########## runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -23,29 +23,56 @@ import java.util.Map; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.polaris.core.admin.model.Catalog; import org.apache.polaris.core.auth.PolarisPrincipal; -import org.apache.polaris.core.entity.PolarisEvent; -import org.apache.polaris.service.events.CatalogsServiceEvents; -import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.AttributeKey; +import org.apache.polaris.service.events.EventAttributes; +import org.apache.polaris.service.events.PolarisEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class PolarisPersistenceEventListener implements PolarisEventListener { - // TODO: Ensure all events (except RateLimiter ones) call `processEvent` + private static final Logger LOGGER = + LoggerFactory.getLogger(PolarisPersistenceEventListener.class); @Override - public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) { - TableMetadata tableMetadata = event.loadTableResponse().tableMetadata(); - PolarisEvent polarisEvent = - new PolarisEvent( - event.catalogName(), + public void onEvent(PolarisEvent event) { + switch (event.type()) { + case AFTER_CREATE_TABLE -> handleAfterCreateTable(event); + case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event); + default -> { + // Other events not handled by this listener + } + } + } + + private void handleAfterCreateTable(PolarisEvent event) { Review Comment: Again, with flattened events hierarchy there is not a huge difference between these two `handleXYZ` methods. It would certainly be possible to create a unified `handleEvent` method that is valid for all events, thus saving us the hassle of writing 150+ methods (just a thought for later, not for this PR though). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
