adutra commented on code in PR #4225:
URL: https://github.com/apache/polaris/pull/4225#discussion_r3379179465


##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -30,11 +30,30 @@
 public class PolarisEvent {
   public static final String EMPTY_MAP_STRING = "{}";
 
+  /**
+   * Sentinel stored in {@link #catalogId} for events that are not scoped to a 
specific catalog
+   * (e.g., principal, principal-role, or other realm-level operations). 
Downstream consumers
+   * querying by catalog must filter this sentinel out (or coalesce it to 
{@code NULL}).
+   *
+   * <p>This is a transitional placeholder: {@code catalogId} is currently 
{@code NOT NULL} in the
+   * persistence schema, so realm-scoped events cannot store {@code null} 
directly. A future schema
+   * migration may relax that constraint and let realm-scoped events use 
{@code NULL} instead.
+   *
+   * <p>TODO: Remove this sentinel once the {@code catalogId} column is made 
nullable. Realm-scoped
+   * events should store {@code NULL}, not a magic string, so downstream 
queries can use {@code IS
+   * NULL} instead of filtering out a reserved value.
+   */
+  public static final String REALM_SCOPED = "__realm__";

Review Comment:
   Do we have a GitHub issue to track this?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +55,256 @@ private void handleAfterCreateTable(PolarisEvent event) {
             event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
-            TableIdentifier.of(namespace, tableName).toString());
-    var additionalParameters =
-        ImmutableMap.<String, String>builder()
-            .put("table-uuid", tableMetadata.uuid())
-            .put("metadata", TableMetadataParser.toJson(tableMetadata));
-    additionalParameters.putAll(event.metadata().openTelemetryContext());
-    polarisEvent.setAdditionalProperties(additionalParameters.build());
+            resourceType,
+            resourceIdentifier);
+
+    Map<String, String> additionalProperties = 
buildAdditionalProperties(event);
+    if (!additionalProperties.isEmpty()) {
+      polarisEvent.setAdditionalProperties(additionalProperties);
+    }
+
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  private void handleAfterCreateCatalog(PolarisEvent event) {
-    Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
-    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
-        new org.apache.polaris.core.entity.PolarisEvent(
-            catalog.getName(),
-            event.metadata().eventId().toString(),
-            event.metadata().requestId().orElse(null),
-            event.type().name(),
-            event.metadata().timestamp().toEpochMilli(),
-            
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
-            catalog.getName());
-    Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
-    if (!openTelemetryContext.isEmpty()) {
-      polarisEvent.setAdditionalProperties(openTelemetryContext);
+  private static String resolveCatalogName(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+  }
+
+  /**
+   * Maps an event's category to a {@link ResourceType} for indexing. 
Exhaustive over all {@link
+   * PolarisEventType.Category} values: any new category added to the enum 
must be classified here
+   * explicitly. The {@code default} arm is intentionally absent so that 
adding a category produces
+   * a compile-time error rather than silently routing to {@link 
ResourceType#REALM}.
+   */
+  private static ResourceType resolveResourceType(PolarisEventType eventType) {
+    return switch (eventType.category()) {
+      case TABLE, GENERIC_TABLE -> ResourceType.TABLE;
+      case VIEW -> ResourceType.VIEW;
+      case NAMESPACE -> ResourceType.NAMESPACE;
+      case CATALOG -> ResourceType.CATALOG;
+      case POLICY,
+          PRINCIPAL,
+          PRINCIPAL_ROLE,
+          CATALOG_ROLE,
+          CREDENTIAL,
+          TRANSACTION,
+          NOTIFICATION,
+          CONFIG,
+          TASK_EXECUTION,
+          RATE_LIMITING ->
+          ResourceType.REALM;
+    };
+  }
+
+  private static String resolveResourceIdentifier(
+      PolarisEvent event, ResourceType resourceType, String catalogName) {
+    return switch (resourceType) {
+      case TABLE -> resolveTableResourceIdentifier(event, catalogName);
+      case VIEW -> resolveViewResourceIdentifier(event, catalogName);
+      case NAMESPACE -> resolveNamespaceResourceIdentifier(event, catalogName);
+      case CATALOG -> resolveCatalogResourceIdentifier(event, catalogName);
+      case REALM -> resolveRealmResourceIdentifier(event);
+    };
+  }
+
+  private static String resolveTableResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+
+    Optional<String> identifierFromTableAttribute =
+        
attributes.get(EventAttributes.TABLE_IDENTIFIER).map(TableIdentifier::toString);
+    if (identifierFromTableAttribute.isPresent()) {
+      return identifierFromTableAttribute.get();
     }
-    processEvent(event.metadata().realmId(), polarisEvent);
+
+    Optional<String> renameIdentifier = resolveRenameIdentifier(event);
+    if (renameIdentifier.isPresent()) {
+      return renameIdentifier.get();
+    }
+
+    Optional<String> namespaceAndTableName = 
resolveNamespaceAndTableName(attributes);
+    if (namespaceAndTableName.isPresent()) {
+      return namespaceAndTableName.get();
+    }
+
+    Optional<String> namespaceAndCreateTableName = 
resolveCreateTableIdentifier(attributes);
+    if (namespaceAndCreateTableName.isPresent()) {
+      return namespaceAndCreateTableName.get();
+    }
+
+    Optional<String> namespaceAndRegisterTableName = 
resolveRegisterTableIdentifier(attributes);
+    if (namespaceAndRegisterTableName.isPresent()) {
+      return namespaceAndRegisterTableName.get();
+    }
+
+    Optional<String> namespaceIdentifier =
+        attributes.get(EventAttributes.NAMESPACE).map(Namespace::toString);
+    if (namespaceIdentifier.isPresent()) {
+      return namespaceIdentifier.get();
+    }
+
+    return attributes
+        .get(EventAttributes.TABLE_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveViewResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.VIEW_IDENTIFIER)
+        .map(TableIdentifier::toString)
+        .or(() -> resolveRenameIdentifier(event))
+        .or(
+            () ->
+                attributes
+                    .get(EventAttributes.NAMESPACE)
+                    .flatMap(
+                        namespace ->
+                            attributes
+                                .get(EventAttributes.VIEW_NAME)
+                                .map(
+                                    viewName ->
+                                        TableIdentifier.of(namespace, 
viewName).toString())))
+        .or(() -> attributes.get(EventAttributes.VIEW_NAME))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static Optional<String> resolveRenameIdentifier(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.RENAME_TABLE_REQUEST)
+        .map(
+            request ->
+                event.type().name().startsWith("AFTER_")
+                    ? request.destination().toString()
+                    : request.source().toString());
+  }
+
+  private static Optional<String> 
resolveNamespaceAndTableName(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.TABLE_NAME)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveCreateTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.CREATE_TABLE_REQUEST)
+                    .map(CreateTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveRegisterTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.REGISTER_TABLE_REQUEST)
+                    .map(RegisterTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static String resolveNamespaceResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .map(Namespace::toString)
+        .or(() -> attributes.get(EventAttributes.NAMESPACE_FQN))
+        .or(() -> attributes.get(EventAttributes.PARENT_NAMESPACE_FQN))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveCatalogResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveRealmResourceIdentifier(PolarisEvent event) {
+    return event.type().name();
+  }
+
+  private static String fallbackResourceIdentifier(PolarisEvent event, String 
catalogName) {
+    if 
(!org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED.equals(catalogName)) 
{
+      return catalogName;
+    }
+    return event.type().name();
+  }
+
+  private static Map<String, String> buildAdditionalProperties(PolarisEvent 
event) {
+    Map<String, String> additionalProperties =
+        new LinkedHashMap<>(event.metadata().openTelemetryContext());
+    event.attributes().forEach((key, value) -> 
additionalProperties.putAll(prune(key, value)));
+    return additionalProperties;
+  }
+
+  /**
+   * Reduces an attribute value to a bounded string representation suitable 
for storage in the
+   * persistence event row. Inlined here (rather than exposed as a swappable 
bean) because pruning
+   * is a serialization concern of this listener only — other listeners are 
free to render the same
+   * value differently.

Review Comment:
   Although, this is probably a shared concern among listeners. A Kafka 
listener probably doesn't want to insert a 1GB record in a topic because of a 
huge table metadata object. 
   
   It's fine to leave this here for now, but I'd like to think about a 
configurable, listener-agnostic pruning layer. See my other comments about how 
this could be turned into a sanitizer.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/RawEventAccess.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+/**
+ * Marker interface signalling that a {@link PolarisEventListener} requires 
the raw, unsanitized
+ * event. Listeners that implement this interface receive the original event 
from {@link
+ * org.apache.polaris.service.events.PolarisEventListeners#deliverEvent}, 
bypassing the global
+ * {@link org.apache.polaris.service.events.EventSanitizer}.
+ *
+ * <p>This is an explicit security opt-out. Implement only when the listener 
has a documented need
+ * for sensitive attributes (for example, a forensic audit sink that records 
principal credentials
+ * or raw catalog configuration). All other listeners receive the sanitized 
event by default.
+ */
+public interface RawEventAccess {}

Review Comment:
   I am a bit against this interface tbh. To opt out (or in) from security, you 
need to rebuild your listener and re-deploy your application. Moreover, what if 
a listener wants to keep deny-listing on, but table metadata pruning off? One 
single interface will not cover that.
   
   I would prefer instead a configuration-based approach.
   
   For instance, we could expand your `EventSanitizer` interface, which 
currently focuses on deny-listing attributes, to encompass a whole set of 
sanitization procedures – each concrete implementation would be an 
`@ApplicationScoped` bean:
   
   - `DenyListEventSanitizer` : does what `DefaultEventSanitizer` does today
   - `TableMetadataSanitizer`: prunes the `TABLE_METADATA` attribute
   - `DerivedAttributeSanitizer` : enriches the event with derived attributes 
(e.g. extract the target entity ID from a catalog or table-metadata attribute)
   -  etc.
   
   Each sanitizer would be independently configurable, for example:
   
   ```properties
   polaris.event-listener.sanitizer.deny-list.attributes=attr1,attr2,attr3
   polaris.event-listener.sanitizer.table-metadata.max-payload-size=10Mb
   polaris.event-listener.sanitizer.table-metadata.include-schema=false
   ```
   
   Then each listener would declare the sanitizers it wants, pretty much like 
it declares today which event types and categories it wants:
   
   ```properties
   polaris.event-listener.<name>.enabled-event-types=...
   polaris.event-listener.<name>.enabled-event-categories=...
   
polaris.event-listener.<name>.enabled-event-sanitizers=deny-list,table-metadata,...
   ```
   
   Then finally, Polaris would leverage CDI to discover all the sanitizer beans 
available, and would leverage the event bus to create a pipeline where the 
event would be first dispatched to intermediate addresses, where it would pass 
from sanitizer to sanitizer, then forwarded to its final address where it would 
be handed off to the listener.
   
   



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.
+ */
+@ApplicationScoped
+@DefaultBean
+public class DefaultEventSanitizer implements EventSanitizer {
+
+  // These carry full domain objects containing credentials or internal 
secrets:
+  // PRINCIPAL - may contain client IDs, internal identity metadata
+  // UPDATE_PRINCIPAL_REQUEST - credential rotation data, secret changes
+  // CATALOG - includes StorageConfigInfo with cloud credentials (AWS keys, 
Azure tokens, etc.)
+  // NOTIFICATION_REQUEST - contains metadata file locations and potentially 
signed URLs
+  static final Set<AttributeKey<?>> DEFAULT_DENYLIST =
+      Set.of(
+          EventAttributes.PRINCIPAL,
+          EventAttributes.UPDATE_PRINCIPAL_REQUEST,
+          EventAttributes.CATALOG,
+          EventAttributes.NOTIFICATION_REQUEST);
+
+  @Inject PolarisEventListenerConfiguration configuration;
+
+  private Set<AttributeKey<?>> denylist;
+
+  DefaultEventSanitizer() {
+    this(DEFAULT_DENYLIST);
+  }
+
+  DefaultEventSanitizer(Set<AttributeKey<?>> denylist) {

Review Comment:
   nit: Maybe annotate with `@VisibleForTesting` if you really intended this 
constructor to be package-private. If you intended instead to make this a 
constructor for subclasses to customize the deny list, then you should make it 
protected.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +55,256 @@ private void handleAfterCreateTable(PolarisEvent event) {
             event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
-            TableIdentifier.of(namespace, tableName).toString());
-    var additionalParameters =
-        ImmutableMap.<String, String>builder()
-            .put("table-uuid", tableMetadata.uuid())
-            .put("metadata", TableMetadataParser.toJson(tableMetadata));
-    additionalParameters.putAll(event.metadata().openTelemetryContext());
-    polarisEvent.setAdditionalProperties(additionalParameters.build());
+            resourceType,
+            resourceIdentifier);
+
+    Map<String, String> additionalProperties = 
buildAdditionalProperties(event);
+    if (!additionalProperties.isEmpty()) {
+      polarisEvent.setAdditionalProperties(additionalProperties);
+    }
+
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  private void handleAfterCreateCatalog(PolarisEvent event) {
-    Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
-    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
-        new org.apache.polaris.core.entity.PolarisEvent(
-            catalog.getName(),
-            event.metadata().eventId().toString(),
-            event.metadata().requestId().orElse(null),
-            event.type().name(),
-            event.metadata().timestamp().toEpochMilli(),
-            
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
-            catalog.getName());
-    Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
-    if (!openTelemetryContext.isEmpty()) {
-      polarisEvent.setAdditionalProperties(openTelemetryContext);
+  private static String resolveCatalogName(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+  }
+
+  /**
+   * Maps an event's category to a {@link ResourceType} for indexing. 
Exhaustive over all {@link
+   * PolarisEventType.Category} values: any new category added to the enum 
must be classified here
+   * explicitly. The {@code default} arm is intentionally absent so that 
adding a category produces
+   * a compile-time error rather than silently routing to {@link 
ResourceType#REALM}.
+   */
+  private static ResourceType resolveResourceType(PolarisEventType eventType) {
+    return switch (eventType.category()) {
+      case TABLE, GENERIC_TABLE -> ResourceType.TABLE;
+      case VIEW -> ResourceType.VIEW;
+      case NAMESPACE -> ResourceType.NAMESPACE;
+      case CATALOG -> ResourceType.CATALOG;
+      case POLICY,
+          PRINCIPAL,
+          PRINCIPAL_ROLE,
+          CATALOG_ROLE,
+          CREDENTIAL,
+          TRANSACTION,
+          NOTIFICATION,
+          CONFIG,
+          TASK_EXECUTION,
+          RATE_LIMITING ->
+          ResourceType.REALM;
+    };
+  }
+
+  private static String resolveResourceIdentifier(
+      PolarisEvent event, ResourceType resourceType, String catalogName) {
+    return switch (resourceType) {
+      case TABLE -> resolveTableResourceIdentifier(event, catalogName);
+      case VIEW -> resolveViewResourceIdentifier(event, catalogName);
+      case NAMESPACE -> resolveNamespaceResourceIdentifier(event, catalogName);
+      case CATALOG -> resolveCatalogResourceIdentifier(event, catalogName);
+      case REALM -> resolveRealmResourceIdentifier(event);
+    };
+  }
+
+  private static String resolveTableResourceIdentifier(PolarisEvent event, 
String catalogName) {

Review Comment:
   The logic in this method is solid, but strikes me as overly convoluted. It 
probably hints at something that could be optimized on the event emission side.
   
   For instance, why have both `TABLE_IDENTIFIER` and `TABLE_NAME` attributes? 
I briefly checked all the call sites that use `TABLE_NAME` and it seems doable 
to use `TABLE_IDENTIFIER` instead. By using `TABLE_IDENTIFIER` consistently we 
could eliminate the `resolveNamespaceAndTableName` step below.
   
   For events with a `CREATE_TABLE_REQUEST` or `REGISTER_TABLE_REQUEST` 
attribute, we could also emit a `TABLE_IDENTIFIER` attribute as well.
   
   Moreover, we could consider that most (all?) events target _some_ polaris 
entity. Having a global attribute `TARGET_ENTITY_ID` would also eliminate most 
of the logic here, while also benefitting to other listeners as well (as 
opposed to any logic done here, which only benefits to the persistence 
listener). We could leverage `EventSanitizer` for this – see my other comments.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListeners.java:
##########
@@ -86,12 +88,14 @@ private void deliverEvent(
       PolarisEvent event, String listenerName, PolarisEventListener listener) {
     LOGGER.debug("Delivering {} event to listener '{}' ({})", event.type(), 
listenerName, listener);
     try {
+      PolarisEvent toDeliver =
+          (listener instanceof RawEventAccess) ? event : 
eventSanitizer.sanitize(event);

Review Comment:
   I agree that sanitizers should execute on the Vert.x event loop, as you did 
here – but this comes with an important caveat highlighted by #4648 : 
sanitizers **MUST NOT** block. Maybe we should add an `@implNote` to the 
`EventSanitizer` interface?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +55,256 @@ private void handleAfterCreateTable(PolarisEvent event) {
             event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
-            TableIdentifier.of(namespace, tableName).toString());
-    var additionalParameters =
-        ImmutableMap.<String, String>builder()
-            .put("table-uuid", tableMetadata.uuid())
-            .put("metadata", TableMetadataParser.toJson(tableMetadata));
-    additionalParameters.putAll(event.metadata().openTelemetryContext());
-    polarisEvent.setAdditionalProperties(additionalParameters.build());
+            resourceType,
+            resourceIdentifier);
+
+    Map<String, String> additionalProperties = 
buildAdditionalProperties(event);
+    if (!additionalProperties.isEmpty()) {
+      polarisEvent.setAdditionalProperties(additionalProperties);
+    }
+
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  private void handleAfterCreateCatalog(PolarisEvent event) {
-    Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
-    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
-        new org.apache.polaris.core.entity.PolarisEvent(
-            catalog.getName(),
-            event.metadata().eventId().toString(),
-            event.metadata().requestId().orElse(null),
-            event.type().name(),
-            event.metadata().timestamp().toEpochMilli(),
-            
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
-            catalog.getName());
-    Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
-    if (!openTelemetryContext.isEmpty()) {
-      polarisEvent.setAdditionalProperties(openTelemetryContext);
+  private static String resolveCatalogName(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+  }
+
+  /**
+   * Maps an event's category to a {@link ResourceType} for indexing. 
Exhaustive over all {@link
+   * PolarisEventType.Category} values: any new category added to the enum 
must be classified here
+   * explicitly. The {@code default} arm is intentionally absent so that 
adding a category produces
+   * a compile-time error rather than silently routing to {@link 
ResourceType#REALM}.
+   */
+  private static ResourceType resolveResourceType(PolarisEventType eventType) {
+    return switch (eventType.category()) {
+      case TABLE, GENERIC_TABLE -> ResourceType.TABLE;
+      case VIEW -> ResourceType.VIEW;
+      case NAMESPACE -> ResourceType.NAMESPACE;
+      case CATALOG -> ResourceType.CATALOG;
+      case POLICY,
+          PRINCIPAL,
+          PRINCIPAL_ROLE,
+          CATALOG_ROLE,
+          CREDENTIAL,
+          TRANSACTION,
+          NOTIFICATION,
+          CONFIG,
+          TASK_EXECUTION,
+          RATE_LIMITING ->
+          ResourceType.REALM;
+    };
+  }
+
+  private static String resolveResourceIdentifier(
+      PolarisEvent event, ResourceType resourceType, String catalogName) {
+    return switch (resourceType) {
+      case TABLE -> resolveTableResourceIdentifier(event, catalogName);
+      case VIEW -> resolveViewResourceIdentifier(event, catalogName);
+      case NAMESPACE -> resolveNamespaceResourceIdentifier(event, catalogName);
+      case CATALOG -> resolveCatalogResourceIdentifier(event, catalogName);
+      case REALM -> resolveRealmResourceIdentifier(event);
+    };
+  }
+
+  private static String resolveTableResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+
+    Optional<String> identifierFromTableAttribute =
+        
attributes.get(EventAttributes.TABLE_IDENTIFIER).map(TableIdentifier::toString);
+    if (identifierFromTableAttribute.isPresent()) {
+      return identifierFromTableAttribute.get();
     }
-    processEvent(event.metadata().realmId(), polarisEvent);
+
+    Optional<String> renameIdentifier = resolveRenameIdentifier(event);
+    if (renameIdentifier.isPresent()) {
+      return renameIdentifier.get();
+    }
+
+    Optional<String> namespaceAndTableName = 
resolveNamespaceAndTableName(attributes);
+    if (namespaceAndTableName.isPresent()) {
+      return namespaceAndTableName.get();
+    }
+
+    Optional<String> namespaceAndCreateTableName = 
resolveCreateTableIdentifier(attributes);
+    if (namespaceAndCreateTableName.isPresent()) {
+      return namespaceAndCreateTableName.get();
+    }
+
+    Optional<String> namespaceAndRegisterTableName = 
resolveRegisterTableIdentifier(attributes);
+    if (namespaceAndRegisterTableName.isPresent()) {
+      return namespaceAndRegisterTableName.get();
+    }
+
+    Optional<String> namespaceIdentifier =
+        attributes.get(EventAttributes.NAMESPACE).map(Namespace::toString);
+    if (namespaceIdentifier.isPresent()) {
+      return namespaceIdentifier.get();
+    }
+
+    return attributes
+        .get(EventAttributes.TABLE_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveViewResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.VIEW_IDENTIFIER)
+        .map(TableIdentifier::toString)
+        .or(() -> resolveRenameIdentifier(event))
+        .or(
+            () ->
+                attributes
+                    .get(EventAttributes.NAMESPACE)
+                    .flatMap(
+                        namespace ->
+                            attributes
+                                .get(EventAttributes.VIEW_NAME)
+                                .map(
+                                    viewName ->
+                                        TableIdentifier.of(namespace, 
viewName).toString())))
+        .or(() -> attributes.get(EventAttributes.VIEW_NAME))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static Optional<String> resolveRenameIdentifier(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.RENAME_TABLE_REQUEST)
+        .map(
+            request ->
+                event.type().name().startsWith("AFTER_")
+                    ? request.destination().toString()
+                    : request.source().toString());
+  }
+
+  private static Optional<String> 
resolveNamespaceAndTableName(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.TABLE_NAME)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveCreateTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.CREATE_TABLE_REQUEST)
+                    .map(CreateTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveRegisterTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.REGISTER_TABLE_REQUEST)
+                    .map(RegisterTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static String resolveNamespaceResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .map(Namespace::toString)
+        .or(() -> attributes.get(EventAttributes.NAMESPACE_FQN))
+        .or(() -> attributes.get(EventAttributes.PARENT_NAMESPACE_FQN))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveCatalogResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveRealmResourceIdentifier(PolarisEvent event) {
+    return event.type().name();
+  }
+
+  private static String fallbackResourceIdentifier(PolarisEvent event, String 
catalogName) {
+    if 
(!org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED.equals(catalogName)) 
{
+      return catalogName;
+    }
+    return event.type().name();

Review Comment:
   That's a strange default value, especially given that the event type is 
already stored in the `event_type` column. Since `resource_identifier` is also 
`NOT NULL` we should either 1) insert a sentinel value like you did for 
`catalog_id` or 2) reject this event as malformed. Wdyt?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +55,256 @@ private void handleAfterCreateTable(PolarisEvent event) {
             event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
-            TableIdentifier.of(namespace, tableName).toString());
-    var additionalParameters =
-        ImmutableMap.<String, String>builder()
-            .put("table-uuid", tableMetadata.uuid())
-            .put("metadata", TableMetadataParser.toJson(tableMetadata));
-    additionalParameters.putAll(event.metadata().openTelemetryContext());
-    polarisEvent.setAdditionalProperties(additionalParameters.build());
+            resourceType,
+            resourceIdentifier);
+
+    Map<String, String> additionalProperties = 
buildAdditionalProperties(event);
+    if (!additionalProperties.isEmpty()) {
+      polarisEvent.setAdditionalProperties(additionalProperties);
+    }
+
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  private void handleAfterCreateCatalog(PolarisEvent event) {
-    Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
-    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
-        new org.apache.polaris.core.entity.PolarisEvent(
-            catalog.getName(),
-            event.metadata().eventId().toString(),
-            event.metadata().requestId().orElse(null),
-            event.type().name(),
-            event.metadata().timestamp().toEpochMilli(),
-            
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
-            catalog.getName());
-    Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
-    if (!openTelemetryContext.isEmpty()) {
-      polarisEvent.setAdditionalProperties(openTelemetryContext);
+  private static String resolveCatalogName(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+  }
+
+  /**
+   * Maps an event's category to a {@link ResourceType} for indexing. 
Exhaustive over all {@link
+   * PolarisEventType.Category} values: any new category added to the enum 
must be classified here
+   * explicitly. The {@code default} arm is intentionally absent so that 
adding a category produces
+   * a compile-time error rather than silently routing to {@link 
ResourceType#REALM}.
+   */
+  private static ResourceType resolveResourceType(PolarisEventType eventType) {
+    return switch (eventType.category()) {
+      case TABLE, GENERIC_TABLE -> ResourceType.TABLE;
+      case VIEW -> ResourceType.VIEW;
+      case NAMESPACE -> ResourceType.NAMESPACE;
+      case CATALOG -> ResourceType.CATALOG;
+      case POLICY,
+          PRINCIPAL,
+          PRINCIPAL_ROLE,
+          CATALOG_ROLE,
+          CREDENTIAL,
+          TRANSACTION,
+          NOTIFICATION,
+          CONFIG,
+          TASK_EXECUTION,
+          RATE_LIMITING ->
+          ResourceType.REALM;
+    };
+  }
+
+  private static String resolveResourceIdentifier(
+      PolarisEvent event, ResourceType resourceType, String catalogName) {
+    return switch (resourceType) {
+      case TABLE -> resolveTableResourceIdentifier(event, catalogName);
+      case VIEW -> resolveViewResourceIdentifier(event, catalogName);
+      case NAMESPACE -> resolveNamespaceResourceIdentifier(event, catalogName);
+      case CATALOG -> resolveCatalogResourceIdentifier(event, catalogName);
+      case REALM -> resolveRealmResourceIdentifier(event);
+    };
+  }
+
+  private static String resolveTableResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+
+    Optional<String> identifierFromTableAttribute =
+        
attributes.get(EventAttributes.TABLE_IDENTIFIER).map(TableIdentifier::toString);
+    if (identifierFromTableAttribute.isPresent()) {
+      return identifierFromTableAttribute.get();
     }
-    processEvent(event.metadata().realmId(), polarisEvent);
+
+    Optional<String> renameIdentifier = resolveRenameIdentifier(event);
+    if (renameIdentifier.isPresent()) {
+      return renameIdentifier.get();
+    }
+
+    Optional<String> namespaceAndTableName = 
resolveNamespaceAndTableName(attributes);
+    if (namespaceAndTableName.isPresent()) {
+      return namespaceAndTableName.get();
+    }
+
+    Optional<String> namespaceAndCreateTableName = 
resolveCreateTableIdentifier(attributes);
+    if (namespaceAndCreateTableName.isPresent()) {
+      return namespaceAndCreateTableName.get();
+    }
+
+    Optional<String> namespaceAndRegisterTableName = 
resolveRegisterTableIdentifier(attributes);
+    if (namespaceAndRegisterTableName.isPresent()) {
+      return namespaceAndRegisterTableName.get();
+    }
+
+    Optional<String> namespaceIdentifier =
+        attributes.get(EventAttributes.NAMESPACE).map(Namespace::toString);
+    if (namespaceIdentifier.isPresent()) {
+      return namespaceIdentifier.get();
+    }
+
+    return attributes
+        .get(EventAttributes.TABLE_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveViewResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.VIEW_IDENTIFIER)
+        .map(TableIdentifier::toString)
+        .or(() -> resolveRenameIdentifier(event))
+        .or(
+            () ->
+                attributes
+                    .get(EventAttributes.NAMESPACE)
+                    .flatMap(
+                        namespace ->
+                            attributes
+                                .get(EventAttributes.VIEW_NAME)
+                                .map(
+                                    viewName ->
+                                        TableIdentifier.of(namespace, 
viewName).toString())))
+        .or(() -> attributes.get(EventAttributes.VIEW_NAME))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static Optional<String> resolveRenameIdentifier(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.RENAME_TABLE_REQUEST)
+        .map(
+            request ->
+                event.type().name().startsWith("AFTER_")
+                    ? request.destination().toString()
+                    : request.source().toString());
+  }
+
+  private static Optional<String> 
resolveNamespaceAndTableName(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.TABLE_NAME)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveCreateTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.CREATE_TABLE_REQUEST)
+                    .map(CreateTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static Optional<String> 
resolveRegisterTableIdentifier(EventAttributeMap attributes) {
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .flatMap(
+            namespace ->
+                attributes
+                    .get(EventAttributes.REGISTER_TABLE_REQUEST)
+                    .map(RegisterTableRequest::name)
+                    .map(tableName -> TableIdentifier.of(namespace, 
tableName).toString()));
+  }
+
+  private static String resolveNamespaceResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    EventAttributeMap attributes = event.attributes();
+    return attributes
+        .get(EventAttributes.NAMESPACE)
+        .map(Namespace::toString)
+        .or(() -> attributes.get(EventAttributes.NAMESPACE_FQN))
+        .or(() -> attributes.get(EventAttributes.PARENT_NAMESPACE_FQN))
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveCatalogResourceIdentifier(PolarisEvent event, 
String catalogName) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElseGet(() -> fallbackResourceIdentifier(event, catalogName));
+  }
+
+  private static String resolveRealmResourceIdentifier(PolarisEvent event) {
+    return event.type().name();
+  }
+
+  private static String fallbackResourceIdentifier(PolarisEvent event, String 
catalogName) {
+    if 
(!org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED.equals(catalogName)) 
{
+      return catalogName;
+    }
+    return event.type().name();
+  }
+
+  private static Map<String, String> buildAdditionalProperties(PolarisEvent 
event) {
+    Map<String, String> additionalProperties =
+        new LinkedHashMap<>(event.metadata().openTelemetryContext());
+    event.attributes().forEach((key, value) -> 
additionalProperties.putAll(prune(key, value)));
+    return additionalProperties;
+  }
+
+  /**
+   * Reduces an attribute value to a bounded string representation suitable 
for storage in the
+   * persistence event row. Inlined here (rather than exposed as a swappable 
bean) because pruning
+   * is a serialization concern of this listener only — other listeners are 
free to render the same
+   * value differently.
+   */
+  private static Map<String, String> prune(AttributeKey<?> key, Object value) {
+    if (value instanceof String || value instanceof Number || value instanceof 
Boolean) {
+      return Map.of(key.name(), value.toString());
+    }
+
+    if (value instanceof Namespace namespace) {
+      return Map.of(key.name(), namespace.toString());
+    }
+
+    if (value instanceof TableIdentifier tableIdentifier) {
+      return Map.of(key.name(), tableIdentifier.toString());
+    }
+
+    if (key.equals(EventAttributes.TABLE_METADATA) && value instanceof 
TableMetadata metadata) {
+      return pruneTableMetadata(metadata);
+    }
+
+    if (key.equals(EventAttributes.LOAD_TABLE_RESPONSE)
+        && value instanceof LoadTableResponse response) {
+      return pruneTableMetadata(response.tableMetadata());
+    }
+
+    if (key.equals(EventAttributes.RENAME_TABLE_REQUEST)
+        && value instanceof RenameTableRequest request) {
+      Map<String, String> result = new LinkedHashMap<>();
+      result.put("rename_source", request.source().toString());

Review Comment:
   I wonder if these shouldn't be standard attributes emitted on every rename 
operation? Every listener dealing with a rename-table event will have to 
compute these values.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.
+ */
+@ApplicationScoped
+@DefaultBean
+public class DefaultEventSanitizer implements EventSanitizer {
+
+  // These carry full domain objects containing credentials or internal 
secrets:
+  // PRINCIPAL - may contain client IDs, internal identity metadata
+  // UPDATE_PRINCIPAL_REQUEST - credential rotation data, secret changes
+  // CATALOG - includes StorageConfigInfo with cloud credentials (AWS keys, 
Azure tokens, etc.)
+  // NOTIFICATION_REQUEST - contains metadata file locations and potentially 
signed URLs
+  static final Set<AttributeKey<?>> DEFAULT_DENYLIST =
+      Set.of(
+          EventAttributes.PRINCIPAL,
+          EventAttributes.UPDATE_PRINCIPAL_REQUEST,
+          EventAttributes.CATALOG,
+          EventAttributes.NOTIFICATION_REQUEST);
+
+  @Inject PolarisEventListenerConfiguration configuration;
+
+  private Set<AttributeKey<?>> denylist;
+
+  DefaultEventSanitizer() {
+    this(DEFAULT_DENYLIST);
+  }
+
+  DefaultEventSanitizer(Set<AttributeKey<?>> denylist) {
+    this.denylist = Set.copyOf(denylist);
+  }
+
+  @PostConstruct
+  void initializeFromConfiguration() {
+    if (configuration == null) {
+      return;
+    }
+    Set<String> configuredNames =
+        configuration.denylistedAttributes().orElse(Collections.emptySet());
+    if (!configuredNames.isEmpty()) {
+      this.denylist = resolveAdditionalDenylist(configuredNames);
+    }
+  }
+
+  @Override
+  public PolarisEvent sanitize(PolarisEvent event) {
+    EventAttributeMap filtered = new EventAttributeMap();
+    event
+        .attributes()
+        .forEach(
+            (key, value) -> {
+              if (!denylist.contains(key)) {
+                putUnchecked(filtered, key, value);
+              }
+            });
+    extractDerivedAttributes(event, filtered);
+    return new PolarisEvent(event.type(), event.metadata(), filtered);
+  }
+
+  public static Set<AttributeKey<?>> resolveAdditionalDenylist(Set<String> 
configuredNames) {
+    Set<AttributeKey<?>> resolved = new LinkedHashSet<>(DEFAULT_DENYLIST);
+    List<String> unknown = new ArrayList<>();
+
+    for (String name : configuredNames) {
+      EventAttributes.findByName(name).ifPresentOrElse(resolved::add, () -> 
unknown.add(name));
+    }
+
+    if (!unknown.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Unknown event attributes in denylist configuration: " + unknown);
+    }
+
+    return Set.copyOf(resolved);
+  }
+
+  private static void extractDerivedAttributes(PolarisEvent event, 
EventAttributeMap filtered) {

Review Comment:
   Oh I just looked at the 
`shouldDeriveCatalogNameFromCatalogObjectDuringFiltering` test and I think I 
get it. But it's too rigid imho. We could do the same for table or view 
metadata, etc.
   
   Imho a better approach is the one I outlined in another comment: a sanitizer 
would first enrich the event with derived attributes (e.g. catalog ID from 
catalog), then a second sanitizer would remove the catalog attribute.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.

Review Comment:
   That's a bit rigid, as I stated above. I think we could achieve the same 
without requiring users to modify Polaris and rebuild it.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.
+ */
+@ApplicationScoped
+@DefaultBean
+public class DefaultEventSanitizer implements EventSanitizer {
+
+  // These carry full domain objects containing credentials or internal 
secrets:
+  // PRINCIPAL - may contain client IDs, internal identity metadata
+  // UPDATE_PRINCIPAL_REQUEST - credential rotation data, secret changes
+  // CATALOG - includes StorageConfigInfo with cloud credentials (AWS keys, 
Azure tokens, etc.)
+  // NOTIFICATION_REQUEST - contains metadata file locations and potentially 
signed URLs
+  static final Set<AttributeKey<?>> DEFAULT_DENYLIST =
+      Set.of(
+          EventAttributes.PRINCIPAL,
+          EventAttributes.UPDATE_PRINCIPAL_REQUEST,
+          EventAttributes.CATALOG,
+          EventAttributes.NOTIFICATION_REQUEST);
+
+  @Inject PolarisEventListenerConfiguration configuration;
+
+  private Set<AttributeKey<?>> denylist;
+
+  DefaultEventSanitizer() {
+    this(DEFAULT_DENYLIST);
+  }
+
+  DefaultEventSanitizer(Set<AttributeKey<?>> denylist) {
+    this.denylist = Set.copyOf(denylist);
+  }
+
+  @PostConstruct
+  void initializeFromConfiguration() {
+    if (configuration == null) {
+      return;
+    }
+    Set<String> configuredNames =
+        configuration.denylistedAttributes().orElse(Collections.emptySet());
+    if (!configuredNames.isEmpty()) {
+      this.denylist = resolveAdditionalDenylist(configuredNames);
+    }
+  }
+
+  @Override
+  public PolarisEvent sanitize(PolarisEvent event) {
+    EventAttributeMap filtered = new EventAttributeMap();
+    event
+        .attributes()
+        .forEach(
+            (key, value) -> {
+              if (!denylist.contains(key)) {
+                putUnchecked(filtered, key, value);
+              }
+            });
+    extractDerivedAttributes(event, filtered);
+    return new PolarisEvent(event.type(), event.metadata(), filtered);
+  }
+
+  public static Set<AttributeKey<?>> resolveAdditionalDenylist(Set<String> 
configuredNames) {
+    Set<AttributeKey<?>> resolved = new LinkedHashSet<>(DEFAULT_DENYLIST);
+    List<String> unknown = new ArrayList<>();
+
+    for (String name : configuredNames) {
+      EventAttributes.findByName(name).ifPresentOrElse(resolved::add, () -> 
unknown.add(name));
+    }
+
+    if (!unknown.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Unknown event attributes in denylist configuration: " + unknown);
+    }
+
+    return Set.copyOf(resolved);
+  }
+
+  private static void extractDerivedAttributes(PolarisEvent event, 
EventAttributeMap filtered) {
+    if (!filtered.contains(EventAttributes.CATALOG_NAME)) {
+      event
+          .attributes()
+          .get(EventAttributes.CATALOG)
+          .map(Catalog::getName)
+          .filter(name -> !name.isBlank())
+          .ifPresent(name -> filtered.put(EventAttributes.CATALOG_NAME, name));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> void putUnchecked(EventAttributeMap map, AttributeKey<T> 
key, Object value) {
+    map.put(key, (T) value);
+  }

Review Comment:
   nit: Slightly better:
   
   ```suggestion
     private static <T> void putUnchecked(EventAttributeMap map, 
AttributeKey<T> key, Object value) {
       map.put(key, key.cast(value));
     }
   ```
   
   (But you still need the method – you can't inline it.)



##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java:
##########
@@ -240,4 +247,42 @@ private EventAttributes() {}
       new AttributeKey<>("detach_policy_request", DetachPolicyRequest.class);
   public static final AttributeKey<GetApplicablePoliciesResponse> 
GET_APPLICABLE_POLICIES_RESPONSE =
       new AttributeKey<>("get_applicable_policies_response", 
GetApplicablePoliciesResponse.class);
+
+  public static Optional<AttributeKey<?>> findByName(String name) {
+    return Optional.ofNullable(AttributeLookupHolder.ALL_BY_NAME.get(name));
+  }
+
+  public static Map<String, AttributeKey<?>> allByName() {
+    return AttributeLookupHolder.ALL_BY_NAME;
+  }
+
+  private static Map<String, AttributeKey<?>> collectAllByName() {

Review Comment:
   Nice addition, and the lazy init is 👌 



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.
+ */
+@ApplicationScoped
+@DefaultBean
+public class DefaultEventSanitizer implements EventSanitizer {
+
+  // These carry full domain objects containing credentials or internal 
secrets:
+  // PRINCIPAL - may contain client IDs, internal identity metadata
+  // UPDATE_PRINCIPAL_REQUEST - credential rotation data, secret changes
+  // CATALOG - includes StorageConfigInfo with cloud credentials (AWS keys, 
Azure tokens, etc.)
+  // NOTIFICATION_REQUEST - contains metadata file locations and potentially 
signed URLs
+  static final Set<AttributeKey<?>> DEFAULT_DENYLIST =
+      Set.of(
+          EventAttributes.PRINCIPAL,
+          EventAttributes.UPDATE_PRINCIPAL_REQUEST,
+          EventAttributes.CATALOG,
+          EventAttributes.NOTIFICATION_REQUEST);
+
+  @Inject PolarisEventListenerConfiguration configuration;
+
+  private Set<AttributeKey<?>> denylist;
+
+  DefaultEventSanitizer() {
+    this(DEFAULT_DENYLIST);
+  }
+
+  DefaultEventSanitizer(Set<AttributeKey<?>> denylist) {
+    this.denylist = Set.copyOf(denylist);
+  }
+
+  @PostConstruct
+  void initializeFromConfiguration() {
+    if (configuration == null) {
+      return;
+    }
+    Set<String> configuredNames =
+        configuration.denylistedAttributes().orElse(Collections.emptySet());
+    if (!configuredNames.isEmpty()) {
+      this.denylist = resolveAdditionalDenylist(configuredNames);
+    }
+  }
+
+  @Override
+  public PolarisEvent sanitize(PolarisEvent event) {
+    EventAttributeMap filtered = new EventAttributeMap();
+    event
+        .attributes()
+        .forEach(
+            (key, value) -> {
+              if (!denylist.contains(key)) {
+                putUnchecked(filtered, key, value);
+              }
+            });
+    extractDerivedAttributes(event, filtered);
+    return new PolarisEvent(event.type(), event.metadata(), filtered);
+  }
+
+  public static Set<AttributeKey<?>> resolveAdditionalDenylist(Set<String> 
configuredNames) {
+    Set<AttributeKey<?>> resolved = new LinkedHashSet<>(DEFAULT_DENYLIST);
+    List<String> unknown = new ArrayList<>();
+
+    for (String name : configuredNames) {
+      EventAttributes.findByName(name).ifPresentOrElse(resolved::add, () -> 
unknown.add(name));
+    }
+
+    if (!unknown.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Unknown event attributes in denylist configuration: " + unknown);
+    }
+
+    return Set.copyOf(resolved);
+  }
+
+  private static void extractDerivedAttributes(PolarisEvent event, 
EventAttributeMap filtered) {

Review Comment:
   This looks odd, what is the purpose of this method?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/DefaultEventSanitizer.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.polaris.core.admin.model.Catalog;
+
+/**
+ * Default implementation of {@link EventSanitizer}. Drops attributes whose 
keys are on the denylist
+ * and derives safe attributes (e.g., {@code CATALOG_NAME} extracted from a 
denied {@code CATALOG}
+ * object) from the originals. The built-in denylist covers attributes that 
carry credentials or
+ * internal secrets and may be extended via {@code 
polaris.event-listener.denylisted-attributes}.
+ *
+ * <p>Operators who need a different denial or derivation policy should 
replace this bean.
+ */
+@ApplicationScoped
+@DefaultBean
+public class DefaultEventSanitizer implements EventSanitizer {
+
+  // These carry full domain objects containing credentials or internal 
secrets:
+  // PRINCIPAL - may contain client IDs, internal identity metadata
+  // UPDATE_PRINCIPAL_REQUEST - credential rotation data, secret changes
+  // CATALOG - includes StorageConfigInfo with cloud credentials (AWS keys, 
Azure tokens, etc.)
+  // NOTIFICATION_REQUEST - contains metadata file locations and potentially 
signed URLs
+  static final Set<AttributeKey<?>> DEFAULT_DENYLIST =
+      Set.of(
+          EventAttributes.PRINCIPAL,
+          EventAttributes.UPDATE_PRINCIPAL_REQUEST,
+          EventAttributes.CATALOG,
+          EventAttributes.NOTIFICATION_REQUEST);
+
+  @Inject PolarisEventListenerConfiguration configuration;
+
+  private Set<AttributeKey<?>> denylist;
+
+  DefaultEventSanitizer() {
+    this(DEFAULT_DENYLIST);
+  }
+
+  DefaultEventSanitizer(Set<AttributeKey<?>> denylist) {
+    this.denylist = Set.copyOf(denylist);
+  }
+
+  @PostConstruct
+  void initializeFromConfiguration() {
+    if (configuration == null) {
+      return;
+    }
+    Set<String> configuredNames =
+        configuration.denylistedAttributes().orElse(Collections.emptySet());
+    if (!configuredNames.isEmpty()) {
+      this.denylist = resolveAdditionalDenylist(configuredNames);
+    }
+  }
+
+  @Override
+  public PolarisEvent sanitize(PolarisEvent event) {
+    EventAttributeMap filtered = new EventAttributeMap();
+    event
+        .attributes()
+        .forEach(
+            (key, value) -> {
+              if (!denylist.contains(key)) {
+                putUnchecked(filtered, key, value);
+              }
+            });
+    extractDerivedAttributes(event, filtered);
+    return new PolarisEvent(event.type(), event.metadata(), filtered);
+  }
+
+  public static Set<AttributeKey<?>> resolveAdditionalDenylist(Set<String> 
configuredNames) {
+    Set<AttributeKey<?>> resolved = new LinkedHashSet<>(DEFAULT_DENYLIST);

Review Comment:
   IIUC, the default denylist is always applied, regardless of user 
configuration. I wonder if this isn't too rigid, and also counter-intuitive? 
What if a super-secure, auditing listener wants to inspect 
`EventAttributes.PRINCIPAL`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to