flyrain commented on code in PR #4225:
URL: https://github.com/apache/polaris/pull/4225#discussion_r3262281270


##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -19,38 +19,37 @@
 
 package org.apache.polaris.service.events.listeners;
 
-import com.google.common.collect.ImmutableMap;
+import jakarta.inject.Inject;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
+import java.util.Optional;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.rest.responses.LoadTableResponse;
-import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
 import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.events.EventAttributeMap;
 import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
 import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.PolarisEventType;
 
 public abstract class PolarisPersistenceEventListener implements 
PolarisEventListener {
 
-  @Override
-  public void onEvent(PolarisEvent event) {
-    switch (event.type()) {
-      case AFTER_CREATE_TABLE -> handleAfterCreateTable(event);
-      case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event);
-      default -> {
-        // Other events not handled by this listener
-      }
-    }
+  @Inject EventPayloadPruner payloadPruner;
+
+  protected PolarisPersistenceEventListener() {}
+
+  PolarisPersistenceEventListener(EventPayloadPruner payloadPruner) {
+    this.payloadPruner = payloadPruner;
   }
 
-  private void handleAfterCreateTable(PolarisEvent event) {
-    LoadTableResponse loadTableResponse =
-        event.attributes().getRequired(EventAttributes.LOAD_TABLE_RESPONSE);
-    TableMetadata tableMetadata = loadTableResponse.tableMetadata();
-    String catalogName = 
event.attributes().getRequired(EventAttributes.CATALOG_NAME);
-    Namespace namespace = 
event.attributes().getRequired(EventAttributes.NAMESPACE);
-    String tableName = 
event.attributes().getRequired(EventAttributes.TABLE_NAME);
+  @Override
+  public void onEvent(PolarisEvent event) {
+    String catalogName = resolveCatalogName(event);

Review Comment:
   It's confusing that a catalog name now can be a realm constant, like 
`__realm__`. We should avoid that if possible. Is that because catalog id 
couldn't be null in DB schema? Can we remove the constraint so that catalog id 
could be null in case of realm-level events? cc @adnanhemani 



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/DefaultEventPayloadPruner.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.polaris.service.events.AttributeKey;
+import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
+
+@ApplicationScoped
+@DefaultBean

Review Comment:
   Pruning is explicitly per-listener by design, the JavaDoc on 
EventPayloadPruner says so:
   
   > While EventAttributeFilter decides which keys pass globally, this 
interface decides how a passing value is serialized, and is applied 
per-listener as a serialization concern.
   
   So nothing in the framework enforces that any listener uses 
EventPayloadPruner at all. A future CloudWatch listener could ignore the 
injected pruner, write its own logic, and the dispatcher wouldn't know or care. 
The "contract" suggested by the interface + CDI bean shape doesn't actually 
exist, it's just a helper that one listener happens to inject.
   
     That makes the current shape misleading on two fronts:
   
     1. CDI suggests a global hook (like EventAttributeFilter, which is 
enforced in deliverEvent()). But pruning isn't a global hook. Reading the code, 
you'd reasonably assume swapping the bean changes pruning everywhere, it 
doesn't.
     2. One listener, one impl, no contract = the indirection earns nothing. A 
static prune(key, value) util on the persistence listener (or a sibling helper 
class) would do exactly the same job with less ceremony.
   
     The two coherent designs are:
   
     - Make it global. Move pruning into PolarisEventListeners.deliverEvent() 
next to sanitize(). Then the CDI bean shape makes sense because every listener 
gets pruned events.
     - Keep it per-listener. Drop the interface + bean and inline it as a util 
/ private method on PolarisPersistenceEventListener. The interface only earns 
its keep when there's a second impl that needs to be selected.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributeFilter.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+/**
+ * Global event attribute filter applied before delivery to all listeners. 
Determines which {@link
+ * AttributeKey}s from an {@link EventAttributeMap} are safe to pass 
downstream. Sensitive
+ * attributes are stripped globally so all listeners receive only sanitized 
events by default.
+ */
+public interface EventAttributeFilter {

Review Comment:
   `EventAttributeFilter` is too narrow for what's running at the choke point
   
     `PolarisEventListeners.sanitize()` does two things:
   
     1. `attributeFilter.isAllowed(key)` per attribute
     2. `extractDerivedAttributes()` (CATALOG → CATALOG_NAME)
   
   Both are sanitization concerns that run at the same point for the same 
reason, but one is an injected bean and the other is a private static method. 
An operator who replaces `EventAttributeFilter` doesn't get to influence 
derivation, that policy is locked inside the dispatcher.
   
   Consider an `EventSanitizer` bean as the global, enforced abstraction:
   
     ```java
     interface EventSanitizer {
       PolarisEvent sanitize(PolarisEvent event);
     }
   ```
   
   A `DefaultEventSanitizer` holds the denylist (or a sub-EventAttributeFilter 
if you want to keep that extension point) plus the derivation logic. The 
dispatcher just calls `sanitizer.sanitize(event)`. This also gives the future 
"opt-in raw-event escape hatch" (TODO at PolarisEventListeners.java:85) a 
natural home, it becomes a strategy choice inside the sanitizer rather than 
another branch in deliverEvent.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +59,187 @@ private void handleAfterCreateTable(PolarisEvent event) {
             event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
-            TableIdentifier.of(namespace, tableName).toString());
-    var additionalParameters =
-        ImmutableMap.<String, String>builder()
-            .put("table-uuid", tableMetadata.uuid())
-            .put("metadata", TableMetadataParser.toJson(tableMetadata));
-    additionalParameters.putAll(event.metadata().openTelemetryContext());
-    polarisEvent.setAdditionalProperties(additionalParameters.build());
+            resourceType,
+            resourceIdentifier);
+
+    Map<String, String> additionalProperties = 
buildAdditionalProperties(event);
+    if (!additionalProperties.isEmpty()) {
+      polarisEvent.setAdditionalProperties(additionalProperties);
+    }
+
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  private void handleAfterCreateCatalog(PolarisEvent event) {
-    Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
-    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
-        new org.apache.polaris.core.entity.PolarisEvent(
-            catalog.getName(),
-            event.metadata().eventId().toString(),
-            event.metadata().requestId().orElse(null),
-            event.type().name(),
-            event.metadata().timestamp().toEpochMilli(),
-            
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
-            catalog.getName());
-    Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
-    if (!openTelemetryContext.isEmpty()) {
-      polarisEvent.setAdditionalProperties(openTelemetryContext);
+  private static String resolveCatalogName(PolarisEvent event) {
+    return event
+        .attributes()
+        .get(EventAttributes.CATALOG_NAME)
+        .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+  }
+
+  private static org.apache.polaris.core.entity.PolarisEvent.ResourceType 
resolveResourceType(
+      PolarisEventType eventType) {
+    return switch (eventType.category()) {
+      case TABLE, GENERIC_TABLE -> 
org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE;
+      case VIEW -> 
org.apache.polaris.core.entity.PolarisEvent.ResourceType.VIEW;
+      case NAMESPACE -> 
org.apache.polaris.core.entity.PolarisEvent.ResourceType.NAMESPACE;
+      case CATALOG -> 
org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG;
+      default -> 
org.apache.polaris.core.entity.PolarisEvent.ResourceType.REALM;

Review Comment:
   The default -> REALM arm in resolveResourceType is a silent catch-all. It 
lumps PRINCIPAL, CATALOG_ROLE, PRINCIPAL_ROLE, POLICY, CREDENTIAL, TRANSACTION, 
NOTIFICATION, CONFIG, etc. all under ResourceType.REALM. That's lossy for audit 
queries (a policy change isn't realm-scoped) and silently routes any future 
Category value to REALM with no compile error. I'd suggest to
   make the switch exhaustive (no default) and explicitly map only the 
genuinely realm-scoped categories to REALM.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -19,38 +19,37 @@
 
 package org.apache.polaris.service.events.listeners;
 
-import com.google.common.collect.ImmutableMap;
+import jakarta.inject.Inject;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
+import java.util.Optional;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.rest.responses.LoadTableResponse;
-import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
 import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.events.EventAttributeMap;
 import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
 import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.PolarisEventType;
 
 public abstract class PolarisPersistenceEventListener implements 
PolarisEventListener {
 
-  @Override
-  public void onEvent(PolarisEvent event) {
-    switch (event.type()) {
-      case AFTER_CREATE_TABLE -> handleAfterCreateTable(event);
-      case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event);
-      default -> {
-        // Other events not handled by this listener
-      }
-    }
+  @Inject EventPayloadPruner payloadPruner;
+
+  protected PolarisPersistenceEventListener() {}
+
+  PolarisPersistenceEventListener(EventPayloadPruner payloadPruner) {
+    this.payloadPruner = payloadPruner;
   }
 
-  private void handleAfterCreateTable(PolarisEvent event) {
-    LoadTableResponse loadTableResponse =
-        event.attributes().getRequired(EventAttributes.LOAD_TABLE_RESPONSE);
-    TableMetadata tableMetadata = loadTableResponse.tableMetadata();
-    String catalogName = 
event.attributes().getRequired(EventAttributes.CATALOG_NAME);
-    Namespace namespace = 
event.attributes().getRequired(EventAttributes.NAMESPACE);
-    String tableName = 
event.attributes().getRequired(EventAttributes.TABLE_NAME);
+  @Override
+  public void onEvent(PolarisEvent event) {
+    String catalogName = resolveCatalogName(event);
+    org.apache.polaris.core.entity.PolarisEvent.ResourceType resourceType =

Review Comment:
   nit: we can import it to avoid the full path reference for `ResourceType`



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/DefaultPersistenceEventHandler.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.Principal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.entity.PolarisEvent.ResourceType;
+import org.apache.polaris.service.events.EventAttributeMap;
+import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DefaultPersistenceEventHandler implements PersistenceEventHandler {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DefaultPersistenceEventHandler.class);
+  private static final String SERIALIZATION_ERROR_PAYLOAD = 
"{\"error\":\"Serialization failed\"}";
+
+  static final String ALL_CATALOGS_SCOPE = "_all_catalogs";
+  static final String ROOT_NAMESPACE_SCOPE = "_root_namespace";
+  static final String TABLE_SCOPE = "_table_scope";

Review Comment:
   These sentinels (`_table_scope`, `_view_scope`, `_all_catalogs`, 
`_root_namespace`) get written directly into the `catalogId` / 
`resourceIdentifier` columns. Works, but overloads those fields, downstream 
queries now need to filter sentinels out. Consider a dedicated scope 
`ResourceType` (e.g. `ROOT`/`SCOPE`) or a nullable column; at minimum document 
the convention on `org.apache.polaris.core.entity.PolarisEvent`.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListeners.java:
##########
@@ -80,8 +82,13 @@ public void onStartup(@Observes StartupEvent event) {
   private void deliverEvent(
       PolarisEvent event, String listenerName, PolarisEventListener listener) {
     LOGGER.debug("Delivering {} event to listener '{}' ({})", event.type(), 
listenerName, listener);
+    // TODO: Add an opt-in mechanism (e.g., a marker interface or annotation) 
for specialized

Review Comment:
   How will "opt-in" work for custom listener? Can you share more details? We 
need to think it through to avoid a big redesign.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to