flyrain commented on code in PR #4225:
URL: https://github.com/apache/polaris/pull/4225#discussion_r3262281270
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -19,38 +19,37 @@
package org.apache.polaris.service.events.listeners;
-import com.google.common.collect.ImmutableMap;
+import jakarta.inject.Inject;
+import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
+import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.rest.responses.LoadTableResponse;
-import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.events.EventAttributeMap;
import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.PolarisEventType;
public abstract class PolarisPersistenceEventListener implements
PolarisEventListener {
- @Override
- public void onEvent(PolarisEvent event) {
- switch (event.type()) {
- case AFTER_CREATE_TABLE -> handleAfterCreateTable(event);
- case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event);
- default -> {
- // Other events not handled by this listener
- }
- }
+ @Inject EventPayloadPruner payloadPruner;
+
+ protected PolarisPersistenceEventListener() {}
+
+ PolarisPersistenceEventListener(EventPayloadPruner payloadPruner) {
+ this.payloadPruner = payloadPruner;
}
- private void handleAfterCreateTable(PolarisEvent event) {
- LoadTableResponse loadTableResponse =
- event.attributes().getRequired(EventAttributes.LOAD_TABLE_RESPONSE);
- TableMetadata tableMetadata = loadTableResponse.tableMetadata();
- String catalogName =
event.attributes().getRequired(EventAttributes.CATALOG_NAME);
- Namespace namespace =
event.attributes().getRequired(EventAttributes.NAMESPACE);
- String tableName =
event.attributes().getRequired(EventAttributes.TABLE_NAME);
+ @Override
+ public void onEvent(PolarisEvent event) {
+ String catalogName = resolveCatalogName(event);
Review Comment:
It's confusing that a catalog name now can be a realm constant, like
`__realm__`. We should avoid that if possible. Is that because catalog id
couldn't be null in DB schema? Can we remove the constraint so that catalog id
could be null in case of realm-level events? cc @adnanhemani
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/DefaultEventPayloadPruner.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import io.quarkus.arc.DefaultBean;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.polaris.service.events.AttributeKey;
+import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
+
+@ApplicationScoped
+@DefaultBean
Review Comment:
Pruning is explicitly per-listener by design, the JavaDoc on
EventPayloadPruner says so:
> While EventAttributeFilter decides which keys pass globally, this
interface decides how a passing value is serialized, and is applied
per-listener as a serialization concern.
So nothing in the framework enforces that any listener uses
EventPayloadPruner at all. A future CloudWatch listener could ignore the
injected pruner, write its own logic, and the dispatcher wouldn't know or care.
The "contract" suggested by the interface + CDI bean shape doesn't actually
exist, it's just a helper that one listener happens to inject.
That makes the current shape misleading on two fronts:
1. CDI suggests a global hook (like EventAttributeFilter, which is
enforced in deliverEvent()). But pruning isn't a global hook. Reading the code,
you'd reasonably assume swapping the bean changes pruning everywhere, it
doesn't.
2. One listener, one impl, no contract = the indirection earns nothing. A
static prune(key, value) util on the persistence listener (or a sibling helper
class) would do exactly the same job with less ceremony.
The two coherent designs are:
- Make it global. Move pruning into PolarisEventListeners.deliverEvent()
next to sanitize(). Then the CDI bean shape makes sense because every listener
gets pruned events.
- Keep it per-listener. Drop the interface + bean and inline it as a util
/ private method on PolarisPersistenceEventListener. The interface only earns
its keep when there's a second impl that needs to be selected.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributeFilter.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+/**
+ * Global event attribute filter applied before delivery to all listeners.
Determines which {@link
+ * AttributeKey}s from an {@link EventAttributeMap} are safe to pass
downstream. Sensitive
+ * attributes are stripped globally so all listeners receive only sanitized
events by default.
+ */
+public interface EventAttributeFilter {
Review Comment:
`EventAttributeFilter` is too narrow for what's running at the choke point
`PolarisEventListeners.sanitize()` does two things:
1. `attributeFilter.isAllowed(key)` per attribute
2. `extractDerivedAttributes()` (CATALOG → CATALOG_NAME)
Both are sanitization concerns that run at the same point for the same
reason, but one is an injected bean and the other is a private static method.
An operator who replaces `EventAttributeFilter` doesn't get to influence
derivation, that policy is locked inside the dispatcher.
Consider an `EventSanitizer` bean as the global, enforced abstraction:
```java
interface EventSanitizer {
PolarisEvent sanitize(PolarisEvent event);
}
```
A `DefaultEventSanitizer` holds the denylist (or a sub-EventAttributeFilter
if you want to keep that extension point) plus the derivation logic. The
dispatcher just calls `sanitizer.sanitize(event)`. This also gives the future
"opt-in raw-event escape hatch" (TODO at PolarisEventListeners.java:85) a
natural home, it becomes a strategy choice inside the sanitizer rather than
another branch in deliverEvent.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -60,34 +59,187 @@ private void handleAfterCreateTable(PolarisEvent event) {
event.type().name(),
event.metadata().timestamp().toEpochMilli(),
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
- org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE,
- TableIdentifier.of(namespace, tableName).toString());
- var additionalParameters =
- ImmutableMap.<String, String>builder()
- .put("table-uuid", tableMetadata.uuid())
- .put("metadata", TableMetadataParser.toJson(tableMetadata));
- additionalParameters.putAll(event.metadata().openTelemetryContext());
- polarisEvent.setAdditionalProperties(additionalParameters.build());
+ resourceType,
+ resourceIdentifier);
+
+ Map<String, String> additionalProperties =
buildAdditionalProperties(event);
+ if (!additionalProperties.isEmpty()) {
+ polarisEvent.setAdditionalProperties(additionalProperties);
+ }
+
processEvent(event.metadata().realmId(), polarisEvent);
}
- private void handleAfterCreateCatalog(PolarisEvent event) {
- Catalog catalog = event.attributes().getRequired(EventAttributes.CATALOG);
- org.apache.polaris.core.entity.PolarisEvent polarisEvent =
- new org.apache.polaris.core.entity.PolarisEvent(
- catalog.getName(),
- event.metadata().eventId().toString(),
- event.metadata().requestId().orElse(null),
- event.type().name(),
- event.metadata().timestamp().toEpochMilli(),
-
event.metadata().user().map(PolarisPrincipal::getName).orElse(null),
- org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG,
- catalog.getName());
- Map<String, String> openTelemetryContext =
event.metadata().openTelemetryContext();
- if (!openTelemetryContext.isEmpty()) {
- polarisEvent.setAdditionalProperties(openTelemetryContext);
+ private static String resolveCatalogName(PolarisEvent event) {
+ return event
+ .attributes()
+ .get(EventAttributes.CATALOG_NAME)
+ .orElse(org.apache.polaris.core.entity.PolarisEvent.REALM_SCOPED);
+ }
+
+ private static org.apache.polaris.core.entity.PolarisEvent.ResourceType
resolveResourceType(
+ PolarisEventType eventType) {
+ return switch (eventType.category()) {
+ case TABLE, GENERIC_TABLE ->
org.apache.polaris.core.entity.PolarisEvent.ResourceType.TABLE;
+ case VIEW ->
org.apache.polaris.core.entity.PolarisEvent.ResourceType.VIEW;
+ case NAMESPACE ->
org.apache.polaris.core.entity.PolarisEvent.ResourceType.NAMESPACE;
+ case CATALOG ->
org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG;
+ default ->
org.apache.polaris.core.entity.PolarisEvent.ResourceType.REALM;
Review Comment:
The default -> REALM arm in resolveResourceType is a silent catch-all. It
lumps PRINCIPAL, CATALOG_ROLE, PRINCIPAL_ROLE, POLICY, CREDENTIAL, TRANSACTION,
NOTIFICATION, CONFIG, etc. all under ResourceType.REALM. That's lossy for audit
queries (a policy change isn't realm-scoped) and silently routes any future
Category value to REALM with no compile error. I'd suggest to
make the switch exhaustive (no default) and explicitly map only the
genuinely realm-scoped categories to REALM.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -19,38 +19,37 @@
package org.apache.polaris.service.events.listeners;
-import com.google.common.collect.ImmutableMap;
+import jakarta.inject.Inject;
+import java.util.LinkedHashMap;
import java.util.Map;
-import org.apache.iceberg.TableMetadata;
-import org.apache.iceberg.TableMetadataParser;
+import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.rest.responses.LoadTableResponse;
-import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.RegisterTableRequest;
import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.service.events.EventAttributeMap;
import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.EventPayloadPruner;
import org.apache.polaris.service.events.PolarisEvent;
+import org.apache.polaris.service.events.PolarisEventType;
public abstract class PolarisPersistenceEventListener implements
PolarisEventListener {
- @Override
- public void onEvent(PolarisEvent event) {
- switch (event.type()) {
- case AFTER_CREATE_TABLE -> handleAfterCreateTable(event);
- case AFTER_CREATE_CATALOG -> handleAfterCreateCatalog(event);
- default -> {
- // Other events not handled by this listener
- }
- }
+ @Inject EventPayloadPruner payloadPruner;
+
+ protected PolarisPersistenceEventListener() {}
+
+ PolarisPersistenceEventListener(EventPayloadPruner payloadPruner) {
+ this.payloadPruner = payloadPruner;
}
- private void handleAfterCreateTable(PolarisEvent event) {
- LoadTableResponse loadTableResponse =
- event.attributes().getRequired(EventAttributes.LOAD_TABLE_RESPONSE);
- TableMetadata tableMetadata = loadTableResponse.tableMetadata();
- String catalogName =
event.attributes().getRequired(EventAttributes.CATALOG_NAME);
- Namespace namespace =
event.attributes().getRequired(EventAttributes.NAMESPACE);
- String tableName =
event.attributes().getRequired(EventAttributes.TABLE_NAME);
+ @Override
+ public void onEvent(PolarisEvent event) {
+ String catalogName = resolveCatalogName(event);
+ org.apache.polaris.core.entity.PolarisEvent.ResourceType resourceType =
Review Comment:
nit: we can import it to avoid the full path reference for `ResourceType`
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/DefaultPersistenceEventHandler.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.RenameTableRequest;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.Principal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.entity.PolarisEvent.ResourceType;
+import org.apache.polaris.service.events.EventAttributeMap;
+import org.apache.polaris.service.events.EventAttributes;
+import org.apache.polaris.service.events.PolarisEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DefaultPersistenceEventHandler implements PersistenceEventHandler {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DefaultPersistenceEventHandler.class);
+ private static final String SERIALIZATION_ERROR_PAYLOAD =
"{\"error\":\"Serialization failed\"}";
+
+ static final String ALL_CATALOGS_SCOPE = "_all_catalogs";
+ static final String ROOT_NAMESPACE_SCOPE = "_root_namespace";
+ static final String TABLE_SCOPE = "_table_scope";
Review Comment:
These sentinels (`_table_scope`, `_view_scope`, `_all_catalogs`,
`_root_namespace`) get written directly into the `catalogId` /
`resourceIdentifier` columns. Works, but overloads those fields, downstream
queries now need to filter sentinels out. Consider a dedicated scope
`ResourceType` (e.g. `ROOT`/`SCOPE`) or a nullable column; at minimum document
the convention on `org.apache.polaris.core.entity.PolarisEvent`.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListeners.java:
##########
@@ -80,8 +82,13 @@ public void onStartup(@Observes StartupEvent event) {
private void deliverEvent(
PolarisEvent event, String listenerName, PolarisEventListener listener) {
LOGGER.debug("Delivering {} event to listener '{}' ({})", event.type(),
listenerName, listener);
+ // TODO: Add an opt-in mechanism (e.g., a marker interface or annotation)
for specialized
Review Comment:
How will "opt-in" work for custom listener? Can you share more details? We
need to think it through to avoid a big redesign.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]