adutra commented on code in PR #3293:
URL: https://github.com/apache/polaris/pull/3293#discussion_r2630943229


##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.CreateViewRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.polaris.core.admin.model.AddGrantRequest;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogRole;
+import org.apache.polaris.core.admin.model.CreatePrincipalRequest;
+import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest;
+import org.apache.polaris.core.admin.model.GrantResource;
+import org.apache.polaris.core.admin.model.Principal;
+import org.apache.polaris.core.admin.model.PrincipalRole;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.core.admin.model.RevokeGrantRequest;
+import org.apache.polaris.core.admin.model.UpdateCatalogRequest;
+import org.apache.polaris.core.admin.model.UpdateCatalogRoleRequest;
+import org.apache.polaris.core.admin.model.UpdatePrincipalRequest;
+import org.apache.polaris.core.admin.model.UpdatePrincipalRoleRequest;
+import org.apache.polaris.core.entity.PolarisPrivilege;
+import org.apache.polaris.service.types.AttachPolicyRequest;
+import org.apache.polaris.service.types.CommitViewRequest;
+import org.apache.polaris.service.types.CreateGenericTableRequest;
+import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.service.types.GetApplicablePoliciesResponse;
+import org.apache.polaris.service.types.LoadPolicyResponse;
+import org.apache.polaris.service.types.NotificationRequest;
+import org.apache.polaris.service.types.UpdatePolicyRequest;
+
+/**
+ * Standard attribute keys for Polaris events. These keys provide type-safe 
access to common event
+ * attributes and enable automatic pruning/filtering logic.
+ */
+public final class EventAttributes {
+  private EventAttributes() {}
+
+  // Catalog attributes
+  public static final AttributeKey<String> CATALOG_NAME =
+      AttributeKey.of("catalog_name", String.class);
+  public static final AttributeKey<Catalog> CATALOG = 
AttributeKey.of("catalog", Catalog.class);
+  public static final AttributeKey<UpdateCatalogRequest> 
UPDATE_CATALOG_REQUEST =
+      AttributeKey.of("update_catalog_request", UpdateCatalogRequest.class);
+
+  // Namespace attributes
+  public static final AttributeKey<Namespace> NAMESPACE =
+      AttributeKey.of("namespace", Namespace.class);
+  public static final AttributeKey<String> NAMESPACE_STRING =

Review Comment:
   Isn't this the same as `NAMESPACE_NAME` below? Or is this rather the 
"fully-qualified" namespace name while the other is the simple name?



##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributes.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events;
+
+import java.util.Map;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.CommitTransactionRequest;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.CreateViewRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.rest.responses.LoadViewResponse;
+import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
+import org.apache.iceberg.view.ViewMetadata;
+import org.apache.polaris.core.admin.model.AddGrantRequest;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogRole;
+import org.apache.polaris.core.admin.model.CreatePrincipalRequest;
+import org.apache.polaris.core.admin.model.CreatePrincipalRoleRequest;
+import org.apache.polaris.core.admin.model.GrantResource;
+import org.apache.polaris.core.admin.model.Principal;
+import org.apache.polaris.core.admin.model.PrincipalRole;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.core.admin.model.RevokeGrantRequest;
+import org.apache.polaris.core.admin.model.UpdateCatalogRequest;
+import org.apache.polaris.core.admin.model.UpdateCatalogRoleRequest;
+import org.apache.polaris.core.admin.model.UpdatePrincipalRequest;
+import org.apache.polaris.core.admin.model.UpdatePrincipalRoleRequest;
+import org.apache.polaris.core.entity.PolarisPrivilege;
+import org.apache.polaris.service.types.AttachPolicyRequest;
+import org.apache.polaris.service.types.CommitViewRequest;
+import org.apache.polaris.service.types.CreateGenericTableRequest;
+import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.service.types.GetApplicablePoliciesResponse;
+import org.apache.polaris.service.types.LoadPolicyResponse;
+import org.apache.polaris.service.types.NotificationRequest;
+import org.apache.polaris.service.types.UpdatePolicyRequest;
+
+/**
+ * Standard attribute keys for Polaris events. These keys provide type-safe 
access to common event
+ * attributes and enable automatic pruning/filtering logic.
+ */
+public final class EventAttributes {
+  private EventAttributes() {}
+
+  // Catalog attributes
+  public static final AttributeKey<String> CATALOG_NAME =
+      AttributeKey.of("catalog_name", String.class);
+  public static final AttributeKey<Catalog> CATALOG = 
AttributeKey.of("catalog", Catalog.class);
+  public static final AttributeKey<UpdateCatalogRequest> 
UPDATE_CATALOG_REQUEST =
+      AttributeKey.of("update_catalog_request", UpdateCatalogRequest.class);
+
+  // Namespace attributes
+  public static final AttributeKey<Namespace> NAMESPACE =
+      AttributeKey.of("namespace", Namespace.class);
+  public static final AttributeKey<String> NAMESPACE_STRING =
+      AttributeKey.of("namespace_string", String.class);
+  public static final AttributeKey<String> PARENT_NAMESPACE =
+      AttributeKey.of("parent_namespace", String.class);
+  public static final AttributeKey<CreateNamespaceRequest> 
CREATE_NAMESPACE_REQUEST =
+      AttributeKey.of("create_namespace_request", 
CreateNamespaceRequest.class);
+  public static final AttributeKey<UpdateNamespacePropertiesRequest>
+      UPDATE_NAMESPACE_PROPERTIES_REQUEST =
+          AttributeKey.of(
+              "update_namespace_properties_request", 
UpdateNamespacePropertiesRequest.class);
+  public static final AttributeKey<UpdateNamespacePropertiesResponse>
+      UPDATE_NAMESPACE_PROPERTIES_RESPONSE =
+          AttributeKey.of(
+              "update_namespace_properties_response", 
UpdateNamespacePropertiesResponse.class);
+
+  @SuppressWarnings("unchecked")
+  public static final AttributeKey<Map<String, String>> NAMESPACE_PROPERTIES =
+      (AttributeKey<Map<String, String>>)
+          (AttributeKey<?>) AttributeKey.of("namespace_properties", Map.class);

Review Comment:
   I'm wondering: in order to increase portability, shouldn't we restrict the 
types of attributes that we can put in this map? An attribute of type 
`Map<String, String>` is still OK, I guess, but what if the attribute doesn't 
have a clear serialized format, e.g. `Optional` or `Function`? Restricting to 
types that can be safely serialized to Json (and maybe gRPC) would imho make 
things easier for listener implementors.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java:
##########
@@ -37,10 +40,15 @@ protected abstract void transformAndSendEvent(
       HashMap<String, Object> properties, PolarisEventMetadata metadata);
 
   @Override
-  public void 
onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) {
-    HashMap<String, Object> properties = new HashMap<>();
-    properties.put("event_type", event.getClass().getSimpleName());
-    properties.put("table_identifier", event.tableIdentifier().toString());
-    transformAndSendEvent(properties, event.metadata());
+  public void onEvent(PolarisEvent event) {

Review Comment:
   With the flattened hierarchy, we need to ask ourselves if this class is 
still useful, as opposed to just passing the `PolarisEvent` as is to the 
CloudWatch sink and let it serialize it without the need for an intermediary 
representation in the form of a Map.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -55,24 +82,38 @@ public void 
onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent ev
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  @Override
-  public void 
onAfterCreateCatalog(CatalogsServiceEvents.AfterCreateCatalogEvent event) {
-    PolarisEvent polarisEvent =
-        new PolarisEvent(
-            event.catalog().getName(),
+  private void handleAfterCreateCatalog(PolarisEvent event) {
+    Catalog catalog = getRequiredAttribute(event, EventAttributes.CATALOG);
+    if (catalog == null) {
+      return;
+    }
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            catalog.getName(),
             event.metadata().eventId().toString(),
             event.metadata().requestId().orElse(null),
-            event.getClass().getSimpleName(),
+            event.type().name(),
             event.metadata().timestamp().toEpochMilli(),
             
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
-            PolarisEvent.ResourceType.CATALOG,
-            event.catalog().getName());
+            org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
+            catalog.getName());
     Map<String, String> openTelemetryContext = 
event.metadata().openTelemetryContext();
     if (!openTelemetryContext.isEmpty()) {
       polarisEvent.setAdditionalProperties(openTelemetryContext);
     }
     processEvent(event.metadata().realmId(), polarisEvent);
   }
 
-  protected abstract void processEvent(String realmId, PolarisEvent event);
+  private <T> T getRequiredAttribute(PolarisEvent event, AttributeKey<T> key) {

Review Comment:
   This could be declared in `PolarisEvent`, I think it's going to be useful 
for many listeners, not just this one.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -23,29 +23,56 @@
 import java.util.Map;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.polaris.core.admin.model.Catalog;
 import org.apache.polaris.core.auth.PolarisPrincipal;
-import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.service.events.CatalogsServiceEvents;
-import org.apache.polaris.service.events.IcebergRestCatalogEvents;
+import org.apache.polaris.service.events.AttributeKey;
+import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class PolarisPersistenceEventListener implements 
PolarisEventListener {
 
-  // TODO: Ensure all events (except RateLimiter ones) call `processEvent`
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(PolarisPersistenceEventListener.class);
 
   @Override
-  public void 
onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) {
-    TableMetadata tableMetadata = event.loadTableResponse().tableMetadata();
-    PolarisEvent polarisEvent =
-        new PolarisEvent(
-            event.catalogName(),
+  public void onEvent(PolarisEvent event) {
+    switch (event.type()) {
+      case AFTER_CREATE_TABLE -> handleAfterCreateTable(event);
+      case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event);
+      default -> {
+        // Other events not handled by this listener
+      }
+    }
+  }
+
+  private void handleAfterCreateTable(PolarisEvent event) {

Review Comment:
   Again, with flattened events hierarchy there is not a huge difference 
between these two `handleXYZ` methods. It would certainly be possible to create 
a unified `handleEvent` method that is valid for all events, thus saving us the 
hassle of writing 150+ methods (just a thought for later, not for this PR 
though).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to