adutra commented on code in PR #4064: URL: https://github.com/apache/polaris/pull/4064#discussion_r3050769630
########## runtime/service/src/test/java/org/apache/polaris/service/events/PolarisEventListenersTest.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import io.smallrye.common.annotation.Identifier; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@TestProfile(PolarisEventListenersTest.PolarisEventListenersTestProfile.class) +public class PolarisEventListenersTest { + static final Set<PolarisEventType> CATALOG_EVENTS = + Arrays.stream(PolarisEventType.values()) + .filter(e -> e.category() == PolarisEventType.Category.CATALOG) + .collect(Collectors.toSet()); + + static final CountDownLatch latch = new CountDownLatch(5 + 3 * CATALOG_EVENTS.size()); Review Comment: This latch looks a bit fragile here imo (the latch size in particular looks a bit "magic"). Would it be possible to use Awaitility instead? ########## runtime/service/src/test/java/org/apache/polaris/service/events/PolarisEventListenersTest.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import io.smallrye.common.annotation.Identifier; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@TestProfile(PolarisEventListenersTest.PolarisEventListenersTestProfile.class) +public class PolarisEventListenersTest { + static final Set<PolarisEventType> CATALOG_EVENTS = + Arrays.stream(PolarisEventType.values()) + .filter(e -> e.category() == PolarisEventType.Category.CATALOG) + .collect(Collectors.toSet()); + + static final CountDownLatch latch = new CountDownLatch(5 + 3 * CATALOG_EVENTS.size()); + + private abstract static class FilteringEventListener implements PolarisEventListener { + private final Predicate<PolarisEvent> predicate; + + List<PolarisEvent> expectedEvents = new ArrayList<>(); Review Comment: I would go with `CopyOnWriteArrayList` here – events can be delivered concurrently. ########## runtime/service/src/test/java/org/apache/polaris/service/events/PolarisEventListenersTest.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import io.smallrye.common.annotation.Identifier; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@TestProfile(PolarisEventListenersTest.PolarisEventListenersTestProfile.class) +public class PolarisEventListenersTest { + static final Set<PolarisEventType> CATALOG_EVENTS = + Arrays.stream(PolarisEventType.values()) + .filter(e -> e.category() == PolarisEventType.Category.CATALOG) + .collect(Collectors.toSet()); + + static final CountDownLatch latch = new CountDownLatch(5 + 3 * CATALOG_EVENTS.size()); + + private abstract static class FilteringEventListener implements PolarisEventListener { + private final Predicate<PolarisEvent> predicate; + + List<PolarisEvent> expectedEvents = new ArrayList<>(); + List<PolarisEvent> unexpectedEvents = new ArrayList<>(); + + FilteringEventListener(Predicate<PolarisEvent> predicate) { + this.predicate = predicate; + } + + @Override + public void onEvent(PolarisEvent event) { + if (predicate.test(event)) { + expectedEvents.add(event); + } else { + unexpectedEvents.add(event); + } + latch.countDown(); + } + } + + @Singleton + @Identifier("after-send-event-listener") + public static class AfterSendEventListener extends FilteringEventListener { + AfterSendEventListener() { + super(event -> event.type() == PolarisEventType.AFTER_SEND_NOTIFICATION); + } + } + + @Singleton + @Identifier("before-send-event-listener") + public static class BeforeSendEventListener extends FilteringEventListener { + BeforeSendEventListener() { + super(event -> event.type() == PolarisEventType.BEFORE_SEND_NOTIFICATION); + } + } + + @Singleton + @Identifier("consume-all-listener") + public static class ConsumeAllEventListener implements PolarisEventListener { + List<PolarisEvent> consumedEvents = new ArrayList<>(); + + @Override + public void onEvent(PolarisEvent event) { + consumedEvents.add(event); + latch.countDown(); + } + } + + @Singleton + @Identifier("consume-only-catalog-events-listener") + public static class ConsumeOnlyCatalogEventsListener extends FilteringEventListener { + ConsumeOnlyCatalogEventsListener() { + super(event -> event.type().category() == PolarisEventType.Category.CATALOG); + } + } + + @Singleton + @Identifier("consume-catalog-and-after-notification-events-listener") + public static class ConsumeCatalogAndNotificationEventsListener extends FilteringEventListener { + ConsumeCatalogAndNotificationEventsListener() { + super( + event -> + event.type().category() == PolarisEventType.Category.CATALOG + || event.type() == PolarisEventType.AFTER_SEND_NOTIFICATION); + } + } + + public static class PolarisEventListenersTestProfile implements QuarkusTestProfile { + @Override + public Map<String, String> getConfigOverrides() { + return ImmutableMap.<String, String>builder() + .put( + "polaris.event-listener.types", + "after-send-event-listener,before-send-event-listener,consume-all-listener,consume-only-catalog-events-listener,consume-catalog-and-after-notification-events-listener") + .put( + "polaris.event-listener.after-send-event-listener.enabled-event-types", + "AFTER_SEND_NOTIFICATION") + .put( + "polaris.event-listener.before-send-event-listener.enabled-event-types", + "BEFORE_SEND_NOTIFICATION") + .put( + "polaris.event-listener.consume-only-catalog-events-listener.enabled-event-categories", + "CATALOG") + .put( + "polaris.event-listener.consume-catalog-and-after-notification-events-listener.enabled-event-categories", + "CATALOG") + .put( + "polaris.event-listener.consume-catalog-and-after-notification-events-listener.enabled-event-types", + "AFTER_SEND_NOTIFICATION") + .build(); + } + } + + @Inject PolarisEventDispatcher eventDispatcher; + + @Inject + @Identifier("after-send-event-listener") + PolarisEventListener afterSendEventListener; + + @Inject + @Identifier("before-send-event-listener") + PolarisEventListener beforeSendEventListener; + + @Inject + @Identifier("consume-all-listener") + PolarisEventListener consumeAllEventListener; + + @Inject + @Identifier("consume-only-catalog-events-listener") + PolarisEventListener consumeOnlyCatalogEventListener; + + @Inject + @Identifier("consume-catalog-and-after-notification-events-listener") + PolarisEventListener consumeCatalogAndNotificationEventsListener; + + @Test + public void testEventListenersGetNotified() throws InterruptedException { Review Comment: One of the tests we had in `PolarisEventListenersMultipleListenersTest` was testing that *two* listeners without any type filtering both receive all events. We don't have this specific test here. Not sure we absolutely must test this scenario, but I wanted to note the difference. ########## runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListeners.java: ########## @@ -41,13 +42,35 @@ public class PolarisEventListeners { @Inject @Any Instance<PolarisEventListener> eventListeners; @Inject PolarisEventListenerConfiguration configuration; + private final BitSet eventsByType = new BitSet(PolarisEventType.values().length); + public void onStartup(@Observes StartupEvent event) { - Set<String> listenerTypeSet = configuration.types().orElseGet(HashSet::new); + var listenerTypeSet = configuration.types().orElseGet(HashSet::new); for (String enabledEventListener : listenerTypeSet) { - PolarisEventListener listener = - eventListeners.select(Identifier.Literal.of(enabledEventListener)).get(); + var listenerConfiguration = configuration.listenerConfig().get(enabledEventListener); + var supportedTypes = PolarisEventType.values(); + if (listenerConfiguration != null) { + Set<PolarisEventType> enabledEventTypes = new HashSet<>(); + if (listenerConfiguration.enabledEventCategories().isPresent()) { + for (var enabledEventCategory : listenerConfiguration.enabledEventCategories().get()) { + enabledEventTypes.addAll(PolarisEventType.typesOfCategory(enabledEventCategory)); + } + } + if (listenerConfiguration.enabledEventTypes().isPresent()) { + enabledEventTypes.addAll(listenerConfiguration.enabledEventTypes().get()); + } + supportedTypes = enabledEventTypes.toArray(PolarisEventType[]::new); + } + var listener = eventListeners.select(Identifier.Literal.of(enabledEventListener)).get(); Handler<Message<PolarisEvent>> handler = e -> listener.onEvent(e.body()); Review Comment: I suggest extracting the delivery logic to a method and surround with exception handling + debug logging, because handlers are not supposed to throw: ```java Handler<Message<PolarisEvent>> handler = message -> deliverEvent(message.body(), enabledEventListener, listener); for (var polarisEventType : supportedTypes) { eventsByType.set(polarisEventType.ordinal()); eventBus.localConsumer(POLARIS_EVENT_CHANNEL + "." + polarisEventType, handler); } } } private void deliverEvent( PolarisEvent event, String listenerName, PolarisEventListener listener) { LOGGER.debug("Delivering {} event to listener '{}' ({})", event.type(), listenerName, listener); try { listener.onEvent(event); } catch (Exception e) { LOGGER.error( "Error while delivering {} event to listener '{}' ({})", event.type(), listenerName, listener, e); } finally { LOGGER.debug( "Delivered {} event to listener '{}' ({})", event.type(), listenerName, listener); } } ``` ########## runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListenerConfiguration.java: ########## @@ -41,4 +44,25 @@ public interface PolarisEventListenerConfiguration { * PolarisEventListener} identifier. */ Optional<Set<String>> types(); + + /** Configuration of each event listener type. */ + @WithParentName + Map<String, ListenerConfiguration> listenerConfig(); + + interface ListenerConfiguration { + /** + * Comma separated list of enabled event types. This event listener will only receive events of + * the selected types. If both the event types and event category configs are set, the listener Review Comment: I think we should also mention what the defaults are: - If no listener configuration exists, all events are enabled. - If a listener configuration exists, events are disabled by default (unless explicitlty listed in either `enabled-event-types` or `enabled-event-categories`). ########## runtime/service/src/main/java/org/apache/polaris/service/events/PolarisEventListeners.java: ########## @@ -41,13 +42,35 @@ public class PolarisEventListeners { @Inject @Any Instance<PolarisEventListener> eventListeners; @Inject PolarisEventListenerConfiguration configuration; + private final BitSet eventsByType = new BitSet(PolarisEventType.values().length); + public void onStartup(@Observes StartupEvent event) { - Set<String> listenerTypeSet = configuration.types().orElseGet(HashSet::new); + var listenerTypeSet = configuration.types().orElseGet(HashSet::new); for (String enabledEventListener : listenerTypeSet) { - PolarisEventListener listener = - eventListeners.select(Identifier.Literal.of(enabledEventListener)).get(); + var listenerConfiguration = configuration.listenerConfig().get(enabledEventListener); + var supportedTypes = PolarisEventType.values(); + if (listenerConfiguration != null) { + Set<PolarisEventType> enabledEventTypes = new HashSet<>(); + if (listenerConfiguration.enabledEventCategories().isPresent()) { + for (var enabledEventCategory : listenerConfiguration.enabledEventCategories().get()) { + enabledEventTypes.addAll(PolarisEventType.typesOfCategory(enabledEventCategory)); + } + } + if (listenerConfiguration.enabledEventTypes().isPresent()) { + enabledEventTypes.addAll(listenerConfiguration.enabledEventTypes().get()); + } + supportedTypes = enabledEventTypes.toArray(PolarisEventType[]::new); + } Review Comment: Minor suggestion: ```suggestion Set<PolarisEventType> supportedTypes; if (listenerConfiguration != null) { supportedTypes = EnumSet.noneOf(PolarisEventType.class); if (listenerConfiguration.enabledEventCategories().isPresent()) { for (var enabledEventCategory : listenerConfiguration.enabledEventCategories().get()) { supportedTypes.addAll(PolarisEventType.typesOfCategory(enabledEventCategory)); } } if (listenerConfiguration.enabledEventTypes().isPresent()) { supportedTypes.addAll(listenerConfiguration.enabledEventTypes().get()); } } else { // No configuration: enable all event types supportedTypes = EnumSet.allOf(PolarisEventType.class); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
